Spark Connect Dotnet Variant Data Type

All Spark Connect Posts

I recently published the latest version of the Spark Connect Dotnet library which includes support for the new Variant data type in Apache Spark 4.0 here. One of the new features of Spark 4.0 is the Variant data type which is a faster way of processing Json data (see here).

Sample data

For this post I used a copy of the sample data from Adobe (https://opensource.adobe.com/Spry/samples/data_region/JSONDataSetSample.html).

Creating the Variant Column

The first thing we need to do is get the data into spark, normally you would have a file and do a spark.read on he json file, in this example I will have a hard coded string and use CreateDataFrame:

var jsonData = @"
[
	{
		""id"": ""0001"",
		""type"": ""donut"",
		""name"": ""Cake"",
		""ppu"": 0.55,
		""batters"":
			{
				""batter"":
					[
						{ ""id"": ""1001"", ""type"": ""Regular"" },
						{ ""id"": ""1002"", ""type"": ""Chocolate"" },
						{ ""id"": ""1003"", ""type"": ""Blueberry"" },
						{ ""id"": ""1004"", ""type"": ""Devil's Food"" }
					]
			},
		""topping"":
			[
				{ ""id"": ""5001"", ""type"": ""None"" },
				{ ""id"": ""5002"", ""type"": ""Glazed"" },
				{ ""id"": ""5005"", ""type"": ""Sugar"" },
				{ ""id"": ""5007"", ""type"": ""Powdered Sugar"" },
				{ ""id"": ""5006"", ""type"": ""Chocolate with Sprinkles"" },
				{ ""id"": ""5003"", ""type"": ""Chocolate"" },
				{ ""id"": ""5004"", ""type"": ""Maple"" }
			]
	},
	{
		""id"": ""0002"",
		""type"": ""donut"",
		""name"": ""Raised"",
		""ppu"": 0.55,
		""batters"":
			{
				""batter"":
					[
						{ ""id"": ""1001"", ""type"": ""Regular"" }
					]
			},
		""topping"":
			[
				{ ""id"": ""5001"", ""type"": ""None"" },
				{ ""id"": ""5002"", ""type"": ""Glazed"" },
				{ ""id"": ""5005"", ""type"": ""Sugar"" },
				{ ""id"": ""5003"", ""type"": ""Chocolate"" },
				{ ""id"": ""5004"", ""type"": ""Maple"" }
			]
	},
	{
		""id"": ""0003"",
		""type"": ""donut"",
		""name"": ""Old Fashioned"",
		""ppu"": 0.55,
		""batters"":
			{
				""batter"":
					[
						{ ""id"": ""1001"", ""type"": ""Regular"" },
						{ ""id"": ""1002"", ""type"": ""Chocolate"" }
					]
			},
		""topping"":
			[
				{ ""id"": ""5001"", ""type"": ""None"" },
				{ ""id"": ""5002"", ""type"": ""Glazed"" },
				{ ""id"": ""5003"", ""type"": ""Chocolate"" },
				{ ""id"": ""5004"", ""type"": ""Maple"" }
			]
	}
]
";

var id = "adobeData";

var df = spark.CreateDataFrame(new (object, object)[]{(jsonData, id),}, "json", "id");

In this instance CreateDataFrame takes a tuple as a row so we are creating a row that looks like this:

+--------------------+---------+
|                json|       id|
+--------------------+---------+
|\n[\n\t{\n\t\t"id...|adobeData|
+--------------------+---------+

ParseJson

The first new function we will look at is ParseJson this is what takes a json string and converts it to a Variant type:

df = df.Select(ParseJson(Col("json")).Alias("variant_data"));
df.Show();
+--------------------+
|        variant_data|
+--------------------+
|[{"batters":{"bat...|
+--------------------+

and the schema:

df.PrintSchema();
root
 |-- variant_data: variant (nullable = true)

ooh nice, that isn’t a string but an actual Variant, lets prove we understand it in .NET:

foreach (var schemaField in df.Schema.Fields)
{
	Console.WriteLine($"{schemaField.Name} of type {schemaField.DataType.TypeName}");
}
variant_data of type variant

lovely, we have a variant from the JSON. Lets extract some data, firstly we have VariantGet which allows us to extract a field from the variant:

df.Select(
		VariantGet(df["variant_data"], "$[0].batters.batter[0].id").Alias("batterId"),
		VariantGet(df["variant_data"], "$[0].batters.batter[0].type").Alias("batterType"),
		VariantGet(df["variant_data"], "$[0].id").Alias("id")
	).Show();
+--------+----------+------+
|batterId|batterType|    id|
+--------+----------+------+
|  "1001"| "Regular"|"0001"|
+--------+----------+------+

Now this is good but the data we have is an array and although VariantGet allows us to extract the data, we can’t extract all the rows, for that we need explode_variant which is currently only available in Spark SQL, it isn’t available in either the scala or pyspark API so we will need to create a view of our table and use spark.Sql against that view:

df.CreateOrReplaceTempView("data");

spark.Sql(@"
	SELECT u.* FROM variant_explode((SELECT variant_data FROM data)) AS t, LATERAL variant_explode(t.value) as u
").Show();

+---+-------+--------------------+
|pos|    key|               value|
+---+-------+--------------------+
|  0|batters|{"batter":[{"id":...|
|  1|     id|              "0001"|
|  2|   name|              "Cake"|
|  3|    ppu|                0.55|
|  4|topping|[{"id":"5001","ty...|
|  5|   type|             "donut"|
|  0|batters|{"batter":[{"id":...|
|  1|     id|              "0002"|
|  2|   name|            "Raised"|
|  3|    ppu|                0.55|
+---+-------+--------------------+
only showing top 10 rows

and there we have it, we can see the data extracted from the variant.

As soon as the variant_explode function is available to call I will make it available, if it happens that you need it before I have made it available then this is another reminder that you can implement any methods you need by making the gRPC calls yourself or by using the helpers in the spark connect dotnet library, something like this:

df.Select(
	new Column(new Expression
	{
		UnresolvedFunction = new Expression.Types.UnresolvedFunction
		{
			FunctionName = "variant_explode", IsDistinct = false, IsUserDefinedFunction = false, Arguments = { Col("variant_data").Expression }
		}
	})
).Show();

currently this fails with:

Unhandled exception. Grpc.Core.RpcException: Status(StatusCode="Internal", Detail="[UNRESOLVED_ROUTINE] Cannot resolve routine `variant_explode` on search path [`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`]. SQLSTATE: 42883")

but when the function is available it should work fine, happy varianting!