Blog .NET for Apache Spark UDFs Missing Shared State
The Problem
When you use a UDF in .NET for Apache Spark, something like this code:
class Program
{
static void Main(string[] args)
{
var spark = SparkSession.Builder().GetOrCreate();
_logging.AppendLine("Starting Select");
var udf = Functions.Udf<int, string>(theUdf);
spark.Range(100).Select(udf(Functions.Col("id"))).Show();
_logging.AppendLine("Ending Select");
Console.WriteLine(_logging.ToString());
}
private static readonly StringBuilder _logging = new StringBuilder();
private static string theUdf(int val)
{
_logging.AppendLine($"udf passed: {val}");
return $"udf passed {val}";
}
}
Generally, knowing .NET we would expect the following output:
+-------------------------------------+
|System.String <Main>b__0_0(Int32)(id)|
+-------------------------------------+
| udf passed: 0|
| udf passed: 1|
| udf passed: 2|
| udf passed: 3|
| udf passed: 4|
| udf passed: 5|
| udf passed: 6|
| udf passed: 7|
| udf passed: 8|
| udf passed: 9|
| udf passed: 10|
| udf passed: 11|
| udf passed: 12|
| udf passed: 13|
| udf passed: 14|
| udf passed: 15|
| udf passed: 16|
| udf passed: 17|
| udf passed: 18|
| udf passed: 19|
+-------------------------------------+
only showing top 20 rows
Starting Select
udf passed: 0
udf passed: 1
udf passed: 2
udf passed: 3
udf passed: 4
udf passed: 5
..
udf passed: 99
Ending Select
However, what we will actually see is:
+-------------------------------------+
|System.String <Main>b__0_0(Int32)(id)|
+-------------------------------------+
| udf passed: 0|
| udf passed: 1|
| udf passed: 2|
| udf passed: 3|
| udf passed: 4|
| udf passed: 5|
| udf passed: 6|
| udf passed: 7|
| udf passed: 8|
| udf passed: 9|
| udf passed: 10|
| udf passed: 11|
| udf passed: 12|
| udf passed: 13|
| udf passed: 14|
| udf passed: 15|
| udf passed: 16|
| udf passed: 17|
| udf passed: 18|
| udf passed: 19|
+-------------------------------------+
only showing top 20 rows
Starting Select
Ending Select
We know our UDF was called because the dataframe contains the data we passed back from the UDF but it is like the StringBuilder is not passed anything.
Further, if we hadn’t initialized the StringBuilder as part of the definition but instead initialized it in our main method such as:
class Program
{
static void Main(string[] args)
{
var spark = SparkSession.Builder().GetOrCreate();
_logging = new StringBuilder();
_logging.AppendLine("Starting Select");
var udf = Functions.Udf<int, string>(theUdf);
spark.Range(100).Select(udf(Functions.Col("id"))).Show();
_logging.AppendLine("Ending Select");
Console.WriteLine(_logging.ToString());
}
private static StringBuilder _logging;
private static string theUdf(int val)
{
_logging.AppendLine($"udf passed: {val}");
return $"udf passed: {val}";
}
}
What we see now is even worse, we will get an exception from our UDF:
ProcessStream() failed with exception: System.NullReferenceException: Object reference not set to an instance of an object.
The stack trace points to our UDF on the _logging.AppendLine - but the _logging object MUST have been initialized because it would have thrown the exception at the line “_logging.AppendLine(“Starting Select”);” but it didn’t.
The Cause
To understand this we need to take a look at how we can create a UDF in .NET that is called by the Java VM Apache Spark code because, that is logically, what happens. In our application we call into Apache Spark and ask it to do things like read from a file, run some transformation and write files back out again. With UDF’s, we ask Spark to run a UDF and Spark comes back to our UDF, passing it some data and asks the UDF to execute but the Java VM does not understand how to execute .NET code.
The way the Java VM can call a UDF is this:
- We run spark-submit asking it to run the org.apache.spark.deploy.dotnet.DotnetRunner and the parameters to run dotnet and our application
- Our .NET for Apache Spark application starts and connects to the DotnetRunner and we ask Apache Spark to run
spark.Range(100).Select(udf(Functions.Col("id"))).Show();
- Apache Spark, creates a DataFrame with 100 rows and sees that it needs to execute a UDF
- Apache Spark then creates a new process to execute the UDF - this is the dotnet worker application, but could be a python application, or any application in any language as long as it connects to the port Apache Spark is expecting
- The worker application starts listening on a port that Apache Spark connects to
- Apache Spark and the worker process pass the data frame column and results back over the tcp port
- When Apache Spark has the result it needs, it returns it
The key here is that a new process is started. In .NET land this means that the new process starts, finds the dll that was used for the main application and uses Assembly.Load to load the assembly then GetType on the assembly. When the type has been found it is instantiated using Delegate.CreateDelegate which bypasses any setup you might have already run up until the point of asking the UDF to be called.
The fact that the code is compiled into a single dll, the code might be physically next to it in your IDE and private anonymous code so no one else can get to it is all irrelevent. The code is, effectively, plucked out of your dll and executed in a totally different process, so no shared state or previously initialized state pass across between your application and the udf application.