Exploring the Spark Connect gRPC API
Goal of this post
In the first two posts, we looked at how to run some Spark code, firstly against a local Spark Connect server and then against a Databricks cluster. In this post, we will look more at the actual gRPC API itself, namely ExecutePlan,
Config
, and AddArtifacts
/ArtifactsStatus.
SparkConnectService.SparkConnectServiceClient
The way we call the API is using the SparkConnectServiceClient
, when we take the .proto files from the Spark repo, and we add them to a visual studio with the Protobuf
build action that comes from the Grpc.Tools
NuGet package then the SparkConnectService
is generated as C# code.
#pragma warning disable 0414, 1591
#region Designer generated code
using grpc = global::Grpc.Core;
namespace Spark.Connect {
/// <summary>
/// Main interface for the SparkConnect service.
/// </summary>
public static partial class SparkConnectService
{
static readonly string __ServiceName = "spark.connect.SparkConnectService";
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static void __Helper_SerializeMessage(global::Google.Protobuf.IMessage message, grpc::SerializationContext context)
...
We have used this in our previous examples to pass a gRPC channel so that the client can be created:
var channel = GrpcChannel.ForAddress("http://localhost:15002", new GrpcChannelOptions(){});
await channel.ConnectAsync();
var client = new SparkConnectService.SparkConnectServiceClient(channel);
SparkConnectServiceClient
This is the object that is responsible for sending requests to the server.
ExecutePlan
ExecutePlan
is what takes our logical plan and sends it to the server, this is the whole point of the service.
Returns ExecutePlanResponse
Parameter | Name | Required | Description |
---|---|---|---|
ExecutePlanRequest |
request | Yes | The details of the request, includes the actual plan that we want executed |
Metadata |
headers | Yes | The headers for the request (When Databricks, cluster id and authentication header) |
DateTime |
deadline | No | If specified the time that the request MUST complete by or fail |
CancellationToken |
cancellationToken | No | You know what this is |
Config
Allows you to read or write (or unset) a spark config item. There is also an async ConfigAsync
that is generated by the gRPC client.
Returns a ConfigResponse
, the response contains a different set of options depending on the operation that was requested.
Operations:
Operation | Description |
---|---|
Get |
Get the value of a config item |
GetAll |
Get all the config items |
Set |
Set the value of a config item |
Unset |
Unset the value of a config item |
Parameter | Name | Required | Description |
---|---|---|---|
ConfigRequest |
request | Yes | The details of the request, includes the actual plan that we want executed |
Metadata |
headers | Yes | The headers for the request (When Databricks, cluster id and authentication header) |
DateTime |
deadline | No | If specified the time that the request MUST complete by or fail |
CancellationToken |
cancellationToken | No | You know what this is |
Example GetAll
var sessionId = Guid.NewGuid().ToString();
var channel = GrpcChannel.ForAddress("http://localhost:15002", new GrpcChannelOptions(){});
await channel.ConnectAsync();
var client = new SparkConnectService.SparkConnectServiceClient(channel);
var response = client.Config(new ConfigRequest()
{
Operation = new ConfigRequest.Types.Operation()
{
GetAll = new ConfigRequest.Types.GetAll()
{
}
}, SessionId = sessionId
}, new Metadata());
foreach (var config in response.Pairs)
{
Console.WriteLine($"{config.Key} = {config.Value}");
}
foreach (var warning in response.Warnings)
{
Console.WriteLine($"Warning: {warning}");
}
Console.WriteLine("Done");
spark.sql.warehouse.dir = file:/Users/ed/spark-warehouse
spark.executor.extraJavaOptions = -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false
spark.driver.host = 192.168.68.58
spark.driver.port = 50308
spark.jars = file:///Users/ed/.ivy2/jars/org.apache.spark_spark-connect_2.12-3.4.0.jar,file:///Users/ed/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar
spark.app.name = Spark Connect server
...
Done
Example Set
const string key = "goeddie.runtime.language";
const string value = ".NET SPARK FUN";
var response = client.Config(new ConfigRequest()
{
Operation = new ConfigRequest.Types.Operation()
{
Set = new ConfigRequest.Types.Set()
{
Pairs = { new KeyValue(){ Key = key, Value = value} }
}
}, SessionId = sessionId
}, new Metadata());
foreach (var warning in response.Warnings)
{
Console.WriteLine($"Warning: {warning}");
}
//In 3.4 we could send a request with a set and a get in the same request and the
// config value we just set would be returned, in 3.5 we get a failure so print out
// the key and the value - if you wanted to make sure that it was actually set you
// can make a separate request to get the value (seems a bit excessive though to me)
Console.WriteLine($"Set '{key}' to '{value}'");
Example Get
var response = client.Config(new ConfigRequest()
{
Operation = new ConfigRequest.Types.Operation()
{
Get = new ConfigRequest.Types.Get()
{
Keys = { "goeddie.runtime.language" }
}
}, SessionId = sessionId
}, new Metadata());
foreach (var config in response.Pairs)
{
Console.WriteLine($"{config.Key} = {config.Value}");
}
foreach (var warning in response.Warnings)
{
Console.WriteLine($"Warning: {warning}");
}
Example Unset
var response = client.Config(new ConfigRequest()
{
Operation = new ConfigRequest.Types.Operation()
{
Unset = new ConfigRequest.Types.Unset()
{
Keys = { "goeddie.runtime.language" }
}
}, SessionId = sessionId
}, new Metadata());
foreach (var warning in response.Warnings)
{
Console.WriteLine($"Warning: {warning}");
}
Artifacts Add / Status
AddArtifacts
creates an AsyncClientStreamCall<AddArtifactsRequest, AddArtifactsResponse>
which can then be used to submit chunks of the artifact until it is uploaded. I have a feeling we will be able to use this to upload custom code to the cluster and then use it to run UDF’s from our Spark code but I haven’t spent any time trying to get that to work.
Operation | Description |
---|---|
AddArtifacts |
Deploys artifacts to the cluster |
ArtifactStatus |
Checks the status of a deployed artifact |
AddArtifacts
When a file is uploaded there are a few different areas the file can be uploaded to:
- cache/
- pyfiles/
- files/
- jars/
- archives/
if the file path doesn’t start with one of these then it looks like the file isn’t saved anywhere.
The files also need to have a valid Crc, if the file doesn’t have a valid crc then it will fail. I used a NuGet package “Crc32.NET” which seems to work fine.
There are a couple of different ways to upload artifacts, if it is large then they should be chunked, if they are small then you can upload in a single batch:
var requestCall = client.AddArtifacts(new Metadata());
var fileContent = Encoding.Unicode.GetBytes("print('hello, world!)'");
var request = new AddArtifactsRequest()
{
Batch = new AddArtifactsRequest.Types.Batch()
{
Artifacts =
{
new AddArtifactsRequest.Types.SingleChunkArtifact()
{
Name = "cache/file.py",
Data = new AddArtifactsRequest.Types.ArtifactChunk()
{
Data = ByteString.CopyFrom(fileContent), Crc = Crc32Algorithm.Compute(fileContent)
}
}
}
}, SessionId = sessionId
};
await requestCall.RequestStream.WriteAsync(request);
await requestCall.RequestStream.CompleteAsync();
await requestCall.ResponseAsync.WaitAsync(TimeSpan.FromSeconds(30));
Console.WriteLine($"Was Crc Successful? {requestCall.ResponseAsync.Result.Artifacts[0].IsCrcSuccessful}");
Note if you start the Spark Connect server on Windows then the Scala code uses the separator from the OS so any files uploaded will need to be in the format cache\file.py
rather than cache/file.py
. This isn’t the client, it is the server. Perhaps running in WSL is just the way to go tbh.
For example:
if (name.startsWith(s"archives${File.separator}")) {
will resolve to \ on Windows and / on everything else.
ArtifactsStatus
When we pass a filename in to the API and ask for the status, the API only ever returns a status if the file is in the cache/
store, if we upload to pyfiles/
or anywhere else then the status will be empty.
If we have uploaded to cache/
then if the file exists we will get a status back:
var request = new ArtifactStatusesRequest()
{
Names = { "cache/file.py" }, SessionId = sessionId
};
var response = client.ArtifactStatus(request);
foreach (var status in response.Statuses)
{
Console.WriteLine($"Artifact: '{status.Key}' status: '{status.Value}' ");
}
this results in:
Artifact: 'cache/file.py' status: '{ "exists": true }'
The relevant Spark code is: https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectArtifactStatusesHandler.scala#L48 and the call is more like “Does the file exist in the cache? rather than does the file exist in the cluster”:
val exists = if (name.startsWith("cache/")) {
cacheExists(
userId = request.getUserContext.getUserId,
sessionId = request.getSessionId,
hash = name.stripPrefix("cache/"))
} else false
if it is false then we get an empty status back:
Artifact: 'cache/AnotherFileNeverUploaded.jar' status: '{ }'
Keep an eye on the scala code, if it ever changes then it might be able to check the status of files outside of the cache.
Other calls
In the next set of posts I will continue looking at the gRPC API itself, the calls we can make and some of the objects. The return streams from executing requests is interesting because it returns relations using Apache Arrow so we can print out the results of things like DataFrame.Show()
which is pretty exciting.