Spark Connect Dotnet Variant Data Type
I recently published the latest version of the Spark Connect Dotnet library which includes support for the new Variant
data type in Apache Spark 4.0 here. One of the new features of Spark 4.0 is the Variant
data type which is a faster way of processing Json data (see here).
Sample data
For this post I used a copy of the sample data from Adobe (https://opensource.adobe.com/Spry/samples/data_region/JSONDataSetSample.html).
Creating the Variant Column
The first thing we need to do is get the data into spark, normally you would have a file and do a spark.read on he json file, in this example I will have a hard coded string and use CreateDataFrame
:
var jsonData = @"
[
{
""id"": ""0001"",
""type"": ""donut"",
""name"": ""Cake"",
""ppu"": 0.55,
""batters"":
{
""batter"":
[
{ ""id"": ""1001"", ""type"": ""Regular"" },
{ ""id"": ""1002"", ""type"": ""Chocolate"" },
{ ""id"": ""1003"", ""type"": ""Blueberry"" },
{ ""id"": ""1004"", ""type"": ""Devil's Food"" }
]
},
""topping"":
[
{ ""id"": ""5001"", ""type"": ""None"" },
{ ""id"": ""5002"", ""type"": ""Glazed"" },
{ ""id"": ""5005"", ""type"": ""Sugar"" },
{ ""id"": ""5007"", ""type"": ""Powdered Sugar"" },
{ ""id"": ""5006"", ""type"": ""Chocolate with Sprinkles"" },
{ ""id"": ""5003"", ""type"": ""Chocolate"" },
{ ""id"": ""5004"", ""type"": ""Maple"" }
]
},
{
""id"": ""0002"",
""type"": ""donut"",
""name"": ""Raised"",
""ppu"": 0.55,
""batters"":
{
""batter"":
[
{ ""id"": ""1001"", ""type"": ""Regular"" }
]
},
""topping"":
[
{ ""id"": ""5001"", ""type"": ""None"" },
{ ""id"": ""5002"", ""type"": ""Glazed"" },
{ ""id"": ""5005"", ""type"": ""Sugar"" },
{ ""id"": ""5003"", ""type"": ""Chocolate"" },
{ ""id"": ""5004"", ""type"": ""Maple"" }
]
},
{
""id"": ""0003"",
""type"": ""donut"",
""name"": ""Old Fashioned"",
""ppu"": 0.55,
""batters"":
{
""batter"":
[
{ ""id"": ""1001"", ""type"": ""Regular"" },
{ ""id"": ""1002"", ""type"": ""Chocolate"" }
]
},
""topping"":
[
{ ""id"": ""5001"", ""type"": ""None"" },
{ ""id"": ""5002"", ""type"": ""Glazed"" },
{ ""id"": ""5003"", ""type"": ""Chocolate"" },
{ ""id"": ""5004"", ""type"": ""Maple"" }
]
}
]
";
var id = "adobeData";
var df = spark.CreateDataFrame(new (object, object)[]{(jsonData, id),}, "json", "id");
In this instance CreateDataFrame
takes a tuple as a row so we are creating a row that looks like this:
+--------------------+---------+
| json| id|
+--------------------+---------+
|\n[\n\t{\n\t\t"id...|adobeData|
+--------------------+---------+
ParseJson
The first new function we will look at is ParseJson
this is what takes a json string and converts it to a Variant
type:
df = df.Select(ParseJson(Col("json")).Alias("variant_data"));
df.Show();
+--------------------+
| variant_data|
+--------------------+
|[{"batters":{"bat...|
+--------------------+
and the schema:
df.PrintSchema();
root
|-- variant_data: variant (nullable = true)
ooh nice, that isn’t a string but an actual Variant
, lets prove we understand it in .NET:
foreach (var schemaField in df.Schema.Fields)
{
Console.WriteLine($"{schemaField.Name} of type {schemaField.DataType.TypeName}");
}
variant_data of type variant
lovely, we have a variant from the JSON. Lets extract some data, firstly we have VariantGet
which allows us to extract a field from the variant:
df.Select(
VariantGet(df["variant_data"], "$[0].batters.batter[0].id").Alias("batterId"),
VariantGet(df["variant_data"], "$[0].batters.batter[0].type").Alias("batterType"),
VariantGet(df["variant_data"], "$[0].id").Alias("id")
).Show();
+--------+----------+------+
|batterId|batterType| id|
+--------+----------+------+
| "1001"| "Regular"|"0001"|
+--------+----------+------+
Now this is good but the data we have is an array and although VariantGet
allows us to extract the data, we can’t extract all the rows, for that we need explode_variant
which is currently only available in Spark SQL, it isn’t available in either the scala or pyspark API so we will need to create a view of our table and use spark.Sql
against that view:
df.CreateOrReplaceTempView("data");
spark.Sql(@"
SELECT u.* FROM variant_explode((SELECT variant_data FROM data)) AS t, LATERAL variant_explode(t.value) as u
").Show();
+---+-------+--------------------+
|pos| key| value|
+---+-------+--------------------+
| 0|batters|{"batter":[{"id":...|
| 1| id| "0001"|
| 2| name| "Cake"|
| 3| ppu| 0.55|
| 4|topping|[{"id":"5001","ty...|
| 5| type| "donut"|
| 0|batters|{"batter":[{"id":...|
| 1| id| "0002"|
| 2| name| "Raised"|
| 3| ppu| 0.55|
+---+-------+--------------------+
only showing top 10 rows
and there we have it, we can see the data extracted from the variant.
As soon as the variant_explode
function is available to call I will make it available, if it happens that you need it before I have made it available then this is another reminder that you can implement any methods you need by making the gRPC calls yourself or by using the helpers in the spark connect dotnet library, something like this:
df.Select(
new Column(new Expression
{
UnresolvedFunction = new Expression.Types.UnresolvedFunction
{
FunctionName = "variant_explode", IsDistinct = false, IsUserDefinedFunction = false, Arguments = { Col("variant_data").Expression }
}
})
).Show();
currently this fails with:
Unhandled exception. Grpc.Core.RpcException: Status(StatusCode="Internal", Detail="[UNRESOLVED_ROUTINE] Cannot resolve routine `variant_explode` on search path [`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`]. SQLSTATE: 42883")
but when the function is available it should work fine, happy varianting!