How to get data in a DataFrame via .NET for Apache Spark

When I first started working with Apache Spark, one of the things I struggled with was that I would have some variable or data in my code that I wanted to work on with Apache Spark. To get the data in a state that Apache Spark can process it involves putting the data into a DataFrame. How do you take some data and get it into a DataFrame?

This post will cover all the ways to get data into a DataFrame in .NET for Apache Spark.

If you would like a working example, then please see the repo:

https://github.com/GoEddie/CreatingDataFrameInSparkDotnet/blob/main/CreatingDataFrames/Program.cs

## Reading data into a DataFrame (save the eye-rolling, but it would be remiss of me not to mention this!) The primary way to get data into Apache Spark is to read the data. Typically you will be reading from a data lake or a hive table, and Apache Spark is well suited to reading data from a variety of sources:

var tempPath = System.IO.Path.GetTempFileName();
File.WriteAllText(tempPath, "[{\"name\": \"ed\"},{\"name\": \"edd\"},{\"name\": \"eddie\"}]");

Console.WriteLine("spark.Read()");
var dataFrame = spark.Read().Json(tempPath);
dataFrame.Show(5);
/*
 *  +-----+
    | name|
    +-----+
    |   ed|
    |  edd|
    |eddie|
    +-----+
 */

In this example, we use spark.Read().Json(tempPath), which reads a JSON document, we could alternatively have written spark.Read().Format("json").Load(tempPath). Using the second version is you can pass a parameter to Format, which allows you to decide the format at runtime, very useful when we use metadata to tell us which files to load.

Spark.Range()

I use spark.Range a lot when playing around and testing stuff in Spark. Range lets you pass in the number of rows you want to create, and Spark creates a DataFrame with that many rows and a single column called “id” which is an incrementing number.

var dataFrame = spark.Range(1000);
dataFrame.Show(5);
/*
 *  +---+
    | id|
    +---+
    |  0|
    |  1|
    |  2|
    |  3|
    |  4|
    +---+
 * 
 */

Warning Obscure Tangent

The thing about using Range is that when I use it, I have this sort of internal monolog that goes along these lines “blimey, projecting out a number of rows is so much easier with spark that is was with SQL Server, I remember having to resort to “select top @AmountOfRowsIWant * from sys.objects cross join sys.sysindexes”.

End of Obscure Tangent


Once you have created the DataFrame you can the use WithColumn to add extra data:

dataFrame = dataFrame.WithColumn("Another Column", Functions.Lit("Literal"));
dataFrame.Show(5);
/*
 *  +---+--------------+
    | id|Another Column|
    +---+--------------+
    |  0|       Literal|
    |  1|       Literal|
    |  2|       Literal|
    |  3|       Literal|
    |  4|       Literal|
    +---+--------------+
 */

or we can use a spark function to add some additional columns to our DataFrame:

dataFrame = dataFrame.WithColumn("Mod", Functions.Pmod(Functions.Col("id"), Functions.Lit(2)));
dataFrame.Show(5);

/*
 *  +---+--------------+---+
    | id|Another Column|Mod|
    +---+--------------+---+
    |  0|       Literal|  0|
    |  1|       Literal|  1|
    |  2|       Literal|  0|
    |  3|       Literal|  1|
    |  4|       Literal|  0|
    +---+--------------+---+
 */

Range in Spark SQL

The next approach to creating DataFrame’s is by using the range function but from Spark SQL rather than the previous example that used the Dataframe API:

var dataFrame = spark.Sql("select id from range(1000)");
dataFrame.Show(5);
/*
 *  +---+
    | id|
    +---+
    |  0|
    |  1|
    |  2|
    |  3|
    |  4|
    +---+
 */

dataFrame = spark.Sql("select id, 'Literal' as `Another Column` from range(1000)");
dataFrame.Show(5);

/*
 *  +---+--------------+
    | id|Another Column|
    +---+--------------+
    |  0|       Literal|
    |  1|       Literal|
    |  2|       Literal|
    |  3|       Literal|
    |  4|       Literal|
    +---+--------------+
 */

dataFrame = spark.Sql("select id, 'Literal' as `Another Column`, pmod(id, 2) as `Mod`  from range(1000)");
dataFrame.Show(5);

/*
 *  +---+--------------+---+
    | id|Another Column|Mod|
    +---+--------------+---+
    |  0|       Literal|  0|
    |  1|       Literal|  1|
    |  2|       Literal|  0|
    |  3|       Literal|  1|
    |  4|       Literal|  0|
    +---+--------------+---+
 */

(Note I have the same internal monolog with this version, in case you were wondering)

CreateDataFrame (built-in types)

The next method is to pass an IEnumerable of a built-in type, which will create one row for each item in the array, and the DataFrame will have one single column called “_1”. The type of the column is the type of the items in the IEnumerable:

var stringArray = new string[] {"a", "b", "c"};
var dataFrame = spark.CreateDataFrame(stringArray);

dataFrame.Show();
/*
 *  +---+
    | _1|
    +---+
    |  a|
    |  b|
    |  c|
    +---+
 */

var stringList = new List<string>() {"d", "e", "f"};
dataFrame = spark.CreateDataFrame(stringList);

dataFrame.Show();
/*
 *  +---+
    | _1|
    +---+
    |  d|
    |  e|
    |  f|
    +---+
 */

var doubleList = new List<double>() {0.0, 1.1, 2.2};
dataFrame = spark.CreateDataFrame(doubleList);

dataFrame.Show();
/*
 *  +---+
    | _1|
    +---+
    |0.0|
    |1.1|
    |2.2|
    +---+
 */

dataFrame = dataFrame.WithColumnRenamed("_1", "double_column");
dataFrame.Show();

/*
 *  +-------------+
    |double_column|
    +-------------+
    |          0.0|
    |          1.1|
    |          2.2|
    +-------------+
 */

dataFrame = dataFrame.WithColumn("literal", Functions.Lit("abc"));
dataFrame.Show();
/*
 *  +-------------+-------+
    |double-column|literal|
    +-------------+-------+
    |          0.0|    abc|
    |          1.1|    abc|
    |          2.2|    abc|
    +-------------+-------+
 */
Console.WriteLine("SelectExpr");
dataFrame =
    dataFrame.SelectExpr("double_column", "literal", "'hello' as literal2", "pmod(double_column, 2)");
dataFrame.Show();
/*
 *  +-------------+-------+--------+--------------------------------------+
    |double_column|literal|literal2|pmod(double_column, CAST(2 AS DOUBLE))|
    +-------------+-------+--------+--------------------------------------+
    |          0.0|    abc|   hello|                                   0.0|
    |          1.1|    abc|   hello|                                   1.1|
    |          2.2|    abc|   hello|                   0.20000000000000018|
    +-------------+-------+--------+--------------------------------------+
 */

dataFrame = dataFrame.WithColumnRenamed("pmod(double_column, CAST(2 AS DOUBLE))", "mod_column");
dataFrame.Show();
/*
 *  +-------------+-------+--------+-------------------+
    |double_column|literal|literal2|         mod_column|
    +-------------+-------+--------+-------------------+
    |          0.0|    abc|   hello|                0.0|
    |          1.1|    abc|   hello|                1.1|
    |          2.2|    abc|   hello|0.20000000000000018|
    +-------------+-------+--------+-------------------+
 */

The last example is where we want to pass in a number of objects to create multiple columns. To use this method, we need to create a StructType which defines the type of each of the columns:

var rowOne = new GenericRow(new object[]
{
    "columnOne Row One", 1.1
});

var rowTwo = new GenericRow(new object[]
{
    "columnOne Row Two", null
});

var rowThree = new GenericRow(new object[]
{
    "columnOne Row Three", 3.3
});

var rows = new List<GenericRow>()
{
    rowOne, rowTwo, rowThree
};

var structType = new StructType(new List<StructField>()
{
    new StructField("column one", new StringType(), isNullable: false),
    new StructField("column two", new DoubleType(), isNullable: true)
});

var dataFrame = spark.CreateDataFrame(rows, structType);
dataFrame.Show();
/*
 *  +-------------------+----------+
    |         column one|column two|
    +-------------------+----------+
    |  columnOne Row One|       1.1|
    |  columnOne Row Two|      null|
    |columnOne Row Three|       3.3|
    +-------------------+----------+
 */

dataFrame.PrintSchema();

/*
 *  root
     |-- column one: string (nullable = false)
     |-- column two: double (nullable = true)
 */

Summary

There you have it. You can:

  • Read data from a file
  • Use SparkSession.Range() or “select * from range(100)” to project some rows and add columns to it.
  • Use SparkSession.CreateDataFrame