Exploring the Spark Connect gRPC API

All Spark Connect Posts

Code

Goal of this post

In the first two posts, we looked at how to run some Spark code, firstly against a local Spark Connect server and then against a Databricks cluster. In this post, we will look more at the actual gRPC API itself, namely ExecutePlan, Config, and AddArtifacts/ArtifactsStatus.

SparkConnectService.SparkConnectServiceClient

The way we call the API is using the SparkConnectServiceClient, when we take the .proto files from the Spark repo, and we add them to a visual studio with the Protobuf build action that comes from the Grpc.Tools NuGet package then the SparkConnectService is generated as C# code.

#pragma warning disable 0414, 1591
#region Designer generated code

using grpc = global::Grpc.Core;

namespace Spark.Connect {
  /// <summary>
  /// Main interface for the SparkConnect service.
  /// </summary>
  public static partial class SparkConnectService
  {
    static readonly string __ServiceName = "spark.connect.SparkConnectService";

    [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
    static void __Helper_SerializeMessage(global::Google.Protobuf.IMessage message, grpc::SerializationContext context)
...

We have used this in our previous examples to pass a gRPC channel so that the client can be created:

var channel = GrpcChannel.ForAddress("http://localhost:15002", new GrpcChannelOptions(){});
await channel.ConnectAsync();

var client = new SparkConnectService.SparkConnectServiceClient(channel);

SparkConnectServiceClient

This is the object that is responsible for sending requests to the server.

ExecutePlan

ExecutePlan is what takes our logical plan and sends it to the server, this is the whole point of the service.

Returns ExecutePlanResponse

Parameter Name Required Description
ExecutePlanRequest request Yes The details of the request, includes the actual plan that we want executed
Metadata headers Yes The headers for the request (When Databricks, cluster id and authentication header)
DateTime deadline No If specified the time that the request MUST complete by or fail
CancellationToken cancellationToken No You know what this is

Config

Allows you to read or write (or unset) a spark config item. There is also an async ConfigAsync that is generated by the gRPC client.

Returns a ConfigResponse, the response contains a different set of options depending on the operation that was requested.

Operations:

Operation Description
Get Get the value of a config item
GetAll Get all the config items
Set Set the value of a config item
Unset Unset the value of a config item
Parameter Name Required Description
ConfigRequest request Yes The details of the request, includes the actual plan that we want executed
Metadata headers Yes The headers for the request (When Databricks, cluster id and authentication header)
DateTime deadline No If specified the time that the request MUST complete by or fail
CancellationToken cancellationToken No You know what this is

Example GetAll


var sessionId = Guid.NewGuid().ToString();

var channel = GrpcChannel.ForAddress("http://localhost:15002", new GrpcChannelOptions(){});
await channel.ConnectAsync();

var client = new SparkConnectService.SparkConnectServiceClient(channel);

var response = client.Config(new ConfigRequest()
{
    Operation = new ConfigRequest.Types.Operation()
    {
        GetAll = new ConfigRequest.Types.GetAll()
        {

        }
    }, SessionId = sessionId
}, new Metadata());

foreach (var config in response.Pairs)
{
    Console.WriteLine($"{config.Key} = {config.Value}");
}

foreach (var warning in response.Warnings)
{
    Console.WriteLine($"Warning: {warning}");
}
Console.WriteLine("Done");
spark.sql.warehouse.dir = file:/Users/ed/spark-warehouse
spark.executor.extraJavaOptions = -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false
spark.driver.host = 192.168.68.58
spark.driver.port = 50308
spark.jars = file:///Users/ed/.ivy2/jars/org.apache.spark_spark-connect_2.12-3.4.0.jar,file:///Users/ed/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar
spark.app.name = Spark Connect server
...
Done

Example Set

const string key = "goeddie.runtime.language";
const string value = ".NET SPARK FUN";

var response = client.Config(new ConfigRequest()
{
    Operation = new ConfigRequest.Types.Operation()
    {
        Set = new ConfigRequest.Types.Set()
        { 
            Pairs = { new KeyValue(){ Key = key, Value = value} }
        }
    }, SessionId = sessionId
}, new Metadata());

foreach (var warning in response.Warnings)
{
    Console.WriteLine($"Warning: {warning}");
}

//In 3.4 we could send a request with a set and a get in the same request and the
// config value we just set would be returned, in 3.5 we get a failure so print out
// the key and the value - if you wanted to make sure that it was actually set you
// can make a separate request to get the value (seems a bit excessive though to me)

Console.WriteLine($"Set '{key}' to '{value}'");

Example Get


var response = client.Config(new ConfigRequest()
{
    Operation = new ConfigRequest.Types.Operation()
    {
        Get = new ConfigRequest.Types.Get()
        {
            Keys = { "goeddie.runtime.language" }
        }
    }, SessionId = sessionId
}, new Metadata());

foreach (var config in response.Pairs)
{
    Console.WriteLine($"{config.Key} = {config.Value}");
}

foreach (var warning in response.Warnings)
{
    Console.WriteLine($"Warning: {warning}");
}

Example Unset

var response = client.Config(new ConfigRequest()
{
    Operation = new ConfigRequest.Types.Operation()
    {
        Unset = new ConfigRequest.Types.Unset()
        { 
            Keys = { "goeddie.runtime.language" }
        }
    }, SessionId = sessionId
}, new Metadata());

foreach (var warning in response.Warnings)
{
    Console.WriteLine($"Warning: {warning}");
}

Artifacts Add / Status

AddArtifacts creates an AsyncClientStreamCall<AddArtifactsRequest, AddArtifactsResponse> which can then be used to submit chunks of the artifact until it is uploaded. I have a feeling we will be able to use this to upload custom code to the cluster and then use it to run UDF’s from our Spark code but I haven’t spent any time trying to get that to work.

Operation Description
AddArtifacts Deploys artifacts to the cluster
ArtifactStatus Checks the status of a deployed artifact

AddArtifacts

When a file is uploaded there are a few different areas the file can be uploaded to:

  • cache/
  • pyfiles/
  • files/
  • jars/
  • archives/

if the file path doesn’t start with one of these then it looks like the file isn’t saved anywhere.

The files also need to have a valid Crc, if the file doesn’t have a valid crc then it will fail. I used a NuGet package “Crc32.NET” which seems to work fine.

There are a couple of different ways to upload artifacts, if it is large then they should be chunked, if they are small then you can upload in a single batch:

var requestCall = client.AddArtifacts(new Metadata());
        
var fileContent = Encoding.Unicode.GetBytes("print('hello, world!)'");

var request = new AddArtifactsRequest()
{
    Batch = new AddArtifactsRequest.Types.Batch()
    {
        Artifacts =
        {
            new AddArtifactsRequest.Types.SingleChunkArtifact()
            {
                Name = "cache/file.py",
                Data = new AddArtifactsRequest.Types.ArtifactChunk()
                {
                    Data = ByteString.CopyFrom(fileContent), Crc = Crc32Algorithm.Compute(fileContent)
                }
            }
        }
    }, SessionId = sessionId
};

await requestCall.RequestStream.WriteAsync(request);

await requestCall.RequestStream.CompleteAsync();

await requestCall.ResponseAsync.WaitAsync(TimeSpan.FromSeconds(30));

Console.WriteLine($"Was Crc Successful? {requestCall.ResponseAsync.Result.Artifacts[0].IsCrcSuccessful}");

Note if you start the Spark Connect server on Windows then the Scala code uses the separator from the OS so any files uploaded will need to be in the format cache\file.py rather than cache/file.py. This isn’t the client, it is the server. Perhaps running in WSL is just the way to go tbh.

For example:

https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala#L185

if (name.startsWith(s"archives${File.separator}")) {

will resolve to \ on Windows and / on everything else.

ArtifactsStatus

When we pass a filename in to the API and ask for the status, the API only ever returns a status if the file is in the cache/ store, if we upload to pyfiles/ or anywhere else then the status will be empty.

If we have uploaded to cache/ then if the file exists we will get a status back:

var request = new ArtifactStatusesRequest()
{
    Names = { "cache/file.py" }, SessionId = sessionId
};

var response = client.ArtifactStatus(request);
foreach (var status in response.Statuses)
{                                                      
    Console.WriteLine($"Artifact: '{status.Key}' status: '{status.Value}' ");    
}

this results in:

Artifact: 'cache/file.py' status: '{ "exists": true }' 

The relevant Spark code is: https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectArtifactStatusesHandler.scala#L48 and the call is more like “Does the file exist in the cache? rather than does the file exist in the cluster”:

val exists = if (name.startsWith("cache/")) {
        cacheExists(
          userId = request.getUserContext.getUserId,
          sessionId = request.getSessionId,
          hash = name.stripPrefix("cache/"))
      } else false

if it is false then we get an empty status back:

Artifact: 'cache/AnotherFileNeverUploaded.jar' status: '{ }' 

Keep an eye on the scala code, if it ever changes then it might be able to check the status of files outside of the cache.

Other calls

In the next set of posts I will continue looking at the gRPC API itself, the calls we can make and some of the objects. The return streams from executing requests is interesting because it returns relations using Apache Arrow so we can print out the results of things like DataFrame.Show() which is pretty exciting.