Exploring the Spark Connect gRPC API more

All Spark Connect Posts

Code

Goal of this post

In this post we will continue looking at the gRPC API and the AnalyzePlan method which takes a plan and analyzes it. To be honest I expected this to be longer but decided just to do the AnalyzePlan method. There are a few more API’s like ReleaseExecute, InterruptAsync, and ReattachExecute that I was going to cover but changed my mind so consider this part of the last post :).

Exploring the Spark Connect gRPC API

All Spark Connect Posts

Code

Goal of this post

In the first two posts, we looked at how to run some Spark code, firstly against a local Spark Connect server and then against a Databricks cluster. In this post, we will look more at the actual gRPC API itself, namely ExecutePlan, Config, and AddArtifacts/ArtifactsStatus.

SparkConnectService.SparkConnectServiceClient

The way we call the API is using the SparkConnectServiceClient, when we take the .proto files from the Spark repo, and we add them to a visual studio with the Protobuf build action that comes from the Grpc.Tools NuGet package then the SparkConnectService is generated as C# code.

Using Spark Connect from .NET to run Spark jobs on Databricks

All Spark Connect Posts

Goal of this post

This post aims to show how we can create a .NET application, deploy it to Databricks, and then run a Databricks job that calls our .NET code, which uses Spark Connect to run a Spark job on the Databricks job cluster to write some data out to Azure storage.

In the previous post, I showed how to use the Range command to create a Spark DataFrame and then save it locally as a parquet file. In this post, we will use the Sql command, which will return a DataFrame or, in our world, a Relation. We will then pass that relation to a WriteOperation command, which will write the results of the Sql out to Azure storage.

Spark Connect Dotnet First Major Milestone

All Spark Connect Posts

When I wrote the spark-connect-dotnet lib I didn’t envisage that I would implement every function, instead it would be a combination of implementing the most common functionality and showing people how they can make their own gRPC calls to call the rest of the functions but what I found is that actually implementing the functions once I had figured out the shared functionality was pretty easy and by implementing all of the functions I was able to get the supporting functions like collecting data back through arrow working.

Using Spark Connect from .NET

All Spark Connect Posts

Introductory Ramble

Spark Connect

In July 2022, at the Data and AI summit, Apache Spark announced “Spark Connect,” which was a way of connecting to Apache Spark using the gRPC protocol rather than using the supplied Java or Python APIs. I’ve been using Spark since 2017 and have been completely obsessed with how transformative it has been in the data processing space.

Me

Over the past couple of decades working in IT, I have found a particular interest in protocols. When I was learning how MSSQL worked, I spent a while figuring out how to read data from disk via backups rather than via the database server (MS Tape Format, if anyone cared). I spent more time than anyone should learning how to parse TDS (before the [MS-TDS] documentation was a thing)—having my head buried in a set of network traces and a pencil and pen has given me more pleasure than I can tell you.

ADF: Error trying to debug pipeline: BadRequest

I made a mistake recently when I was creating an ADF pipeline, annoyingly I made loads of changes and then clicked the debug button, when I pressed debug the pipeline failed to start and I was presented with this little beaut of an error message:

Error debugging pipeline

The pipeline was quite complicated and so I didn’t know exactly what was causing it so I went through the usual ADF troubleshooting steps (save all then refesh the web page) that didn’t help. The thought of doing the “binary chop” on my pipeline was not fun especially as I use a macbook and ctrl+z doesn’t undo (nor does cmd+z) annoyingly.

ADF: Querying JSON documents

In my previous blog post I talked about how to read from an XML Webervice and use xpath to query the XML on the expressions side of things. You can read the XML article here (https://the.agilesql.club/2021/02/adf-xml-objects-and-xpath-in-the-expression-language/). Now, what if we don’t have XML but have JSON? Well well indeed, what if there was a way to query JSON documents using a query, imagine if you will a JSONQuery where you can pass a similar query to an xpath query to retrieve specific values from the JSON document. That would be awesome wouldn’t it? Well it isn’t available here, the is no support fpr JSONQuery in ADF. However, we can create an xml document from a JSON document and then query that, if we are very very careful about how we pass the JSON document to the xml function.

ADF, XML objects and XPath in the expression language

When you use ADF, there are two sides to the coin. The first is the data itself that ADF does very well, from moving it from one site to another to flattening JSON documents and converting from CSV to Avro, to Parquet, to SQL is powerful. The other side of the coin is how ADF uses data as variables to manage the pipeline, and it is this side of the coin that I wish to talk about today.

Synapse Analytics and .NET for Apache Spark Example 4 - JOINS

This is a bit of a longer one, a look at how to do all the different joins and the exciting thing for MSSQL developers is that we get a couple of extra joins (semi and anti semi oooooooh).

T-SQL

SELECT * FROM 
    chicago.safety_data one 
INNER JOIN chicago.safety_data two ON one.Address = two.Address;

Spark SQL

SELECT * FROM 
    chicago.safety_data one 
INNER JOIN chicago.safety_data two ON one.Address = two.Address

DataFrame API (C#)

var dataFrame = spark.Read().Table("chicago.safety_data");
dataFrame.Alias("one")
            .Join(
                    dataFrame.Alias("two")
                    , Functions.Col("one.Address") == Functions.Col("two.Address")
                    , "inner").Show();

T-SQL LEFT OUTER JOIN

Spark SQL

ELECT * FROM 
    chicago.safety_data one 
LEFT OUTER JOIN chicago.safety_data two ON one.Address = two.Address

DataFrame API (C#)

var dataFrame = spark.Read().Table("chicago.safety_data");;
dataFrame.Alias("one")
            .Join(
                    dataFrame.Alias("two")
                    , Functions.Col("one.Address") == Functions.Col("two.Address")
                    , "left_outer").Show();  //alternative "left" or "leftouter"

T-SQL RIGHT OUTER JOIN

SELECT * FROM 
    chicago.safety_data one 
RIGHT OUTER JOIN chicago.safety_data two ON one.Address = two.Address;

Spark SQL

SELECT * FROM 
    chicago.safety_data one 
RIGHT OUTER JOIN chicago.safety_data two ON one.Address = two.Address

DataFrame API (C#)

var dataFrame = spark.Read().Table("chicago.safety_data");;
dataFrame.Alias("one")
            .Join(
                    dataFrame.Alias("two")
                    , Functions.Col("one.Address") == Functions.Col("two.Address")
                    , "right_outer").Show();   //alternative "right" or "rightouter"

T-SQL FULL OUTER JOIN

SELECT * FROM 
    chicago.safety_data one 
FULL OUTER JOIN chicago.safety_data two ON one.Address = two.Address;

Spark SQL


SELECT * FROM 
    chicago.safety_data one 
FULL OUTER JOIN chicago.safety_data two ON one.Address = two.Address

DataFrame API (C#)

var dataFrame = spark.Read().Table("chicago.safety_data");
dataFrame.Alias("one")
            .Join(
                    dataFrame.Alias("two")
                    , Functions.Col("one.Address") == Functions.Col("two.Address")
                    , "full_outer").Show(); //alternative "outer" or "fullouter"

T-SQL CROSS JOIN

SELECT * FROM 
    chicago.safety_data one 
CROSS JOIN chicago.safety_data two;

Spark SQL

SELECT * FROM 
    chicago.safety_data one 
CROSS JOIN chicago.safety_data

DataFrame API (C#)

Note that we must use CrossJoin rather than Join when we don’t pass in a list of columns to join on.

Synapse Analytics and .NET for Apache Spark Example 3 - CTE()

The next example is how to do a CTE (Common Table Expression). When creating the CTE I will also rename one of the columns from “dataType” to “x”.

T-SQL

WITH CTE(x, dataType, dataSubType)
AS
(
  SELECT dateTime, dataType, dataSubType FROM chicago.safety_data
)
SELECT * FROM CTE;

Spark SQL

WITH CTE 
AS 
    (SELECT dateTime as x, dataType, dataSubType FROM chicago.safety_data)

SELECT * FROM CTE

DataFrame API (C#)

The DataFrame example is a bit odd - by creating a data frame with the first query we have the CTE that we can use: