spark-dotnet how to manually break a file into rows and columns

I found this question on stack overflow that went something like this:

“I have a file that includes line endings in the wrong place and I need to parse the text manually into rows” (https://stackoverflow.com/questions/57294619/read-a-textfile-of-fixed-length-with-newline-as-one-of-attribute-value-into-a-ja/57317527).

I thought it would be interesting to implement this with what we have available today in spark-dotnet. The thing is though that even though this is possible in spark-dotnet or the other versions of spark, I would pre-process the file in something else and by the time spark reads the file have it already in a suitable format. On that note, lets look at the problem.

You have a data file to read in and instead of a new line character being a seperator, the newline is part of the data - a single row looks like:

020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00001

a group of rows looks like:

020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00001020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00002020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00003020190718000000000000002CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019051AM00004

broken into rows this is:

020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00001

020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00002

020190718000000000000001CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019050AM00003

020190718000000000000002CHATHOLDER SUBCONTRACT1MNV3.0
LOGFILEGENAT07312019051AM00004

Weird hey! anyway we all get dealt hands we don’t want sometimes :)

So the first thing we need to do is to read in the whole file in one chunk, if we just do a standard read the file will get broken into rows based on the newline character:

var file = spark.Read().Option("wholeFile", true).Text(@"C:\git\files\newline-as-data.txt");

The “wholeFile” option is supposed to read the file in as a single value but for some reason this wasn’t working, I also tried option("multiLine", true) but nothing, in the end I was left with two choices, either let spark break on the new line character and do a concat on all the rows but that felt unsatisfactory, the only real option I could find was to change the newline character to something I knew wouldn’t be in the file (my demo my rules!) so I decided on a null character:

var file = spark.Read().Option("wholeFile", true).Option("lineSep", "\0").Text(@"C:\git\files\newline-as-data.txt");

When I run this, file is a data frame with a single row and column in it:

Single Value Data Frame(https://the.agilesql.club/assets/images/spark/file-single-value.png)

So good so far, now the first thing we need to do is break the file into rows. There are a few options in spark for this, however fewer options with spark-dotnet today, there is talk of implementing createDataFrame but today neither that nor the RDD functions are available. In the original stack overflow post I suggested using a list and calling toDF on it which creates a data frame, that isn’t available here. Other things that aren’t available that would have been perfect are createDataFrame and parallelize (maybe by the time of reading this they are available, lets hope :) ).

I decided to use a UDF to split the data into rows and columns, it could have been implemented in pure spark because the row and columns are fixed length but what if there were different row types we needed to handle or there was something like a row size field we needed to read? I would follow my standard of going with the easiest solution and if it proves to not be fast enough, look at re-writing it:

spark.Udf().Register<string, List<string[]>>("ParseTextFile", (text) => Split(text));

We will take a single string and return a list that contains arrays of strings, there are some fields other than string but we will cast those later on to the data types we actually want. The UDF (note this is pure c# not spark, our UDF knows nothing of Spark):


        private static List<string[]> Split(string textFile)
        {
            const int chunkSize = 84;
            int fileLength = textFile.Length;
            var list = new List<string[]>();
            for (var offset = 0; offset < fileLength; offset += chunkSize)
            {
                var row = textFile.Substring(offset, chunkSize);
                list.Add(
                    new string[]
                    {   //yuck! 
                        row.Substring(0, 1), row.Substring(1, 8), row.Substring(9, 15), row.Substring(24, 11), 
                        row.Substring(35, 11), row.Substring(46, 1), row.Substring(47, 3), row.Substring(50, 3),
                        row.Substring(54, 1), row.Substring(55, 25), row.Substring(80, 4)
                    }
                );
            }

            return list;
        }

Here we return a list of string arrays, each array has one value for each column. The next thing we do is to take the single string we read in and convert it into rows, each row having an array of strings:

        var exploded = file.Select(Functions.Explode(
            Functions.CallUDF("ParseTextFile", file.Col("value"))
        ));

        exploded.Show();

Single row exploded into multiple rows(https://the.agilesql.club/assets/images/spark/file-exploded.png)

Here we have taken the value from the file data frame and passed it to our UDF which is then passed to Microsoft.Spark.Sql.Functions.Explode - this function takes a list and creates a new row for every item in that list, we could stop there but all we have is rows of arrays so lets do a select over that array and convert each item into a column and also do some type conversion and date parsing:

var allColumns = exploded
                    .Select(exploded.Col("col").GetItem(0), 
                            Functions.ToDate(exploded.Col("col").GetItem(1), "yyyyMMdd"),
                            exploded.Col("col").GetItem(2),
                            exploded.Col("col").GetItem(3),
                            exploded.Col("col").GetItem(4),
                            exploded.Col("col").GetItem(5),
                            exploded.Col("col").GetItem(6),
                            exploded.Col("col").GetItem(7).Cast("float"),
                            exploded.Col("col").GetItem(8),
                            exploded.Col("col").GetItem(9),
                            exploded.Col("col").GetItem(10).Cast("int")
                            );

Finally, lets add column names, check our schema and make sure the data looks good:

var final = allColumns.ToDF("RecType", "Date", "Productnumber", "TAG", "Contract", "Filler1", "Code", "Version", "newline", "FILENAME", "Recnumber");
            final.PrintSchema();
            final.Show();

Single row exploded into multiple rows(https://the.agilesql.club/assets/images/spark/file-all-done.png)

There you have it, we have taken a single file with weird formatting and broken it into rows and columns, nice column names and data types.

As I said at the beginning I would pre-process this file before it gets to spark then the spark logic is csv read (or parquet or something) but when we want to project a new set of rows, today, with spark-dotnet we are limited but Explode is definetly a friend.

For a working example see:

https://github.com/GoEddie/dotnet-spark-examples/tree/master/examples/chunk-file

Do you know any other ways to use spark-dotnet to create a data frame without reading a file? I would love to hear other solutions to this so be sure to let me know!

UPDATE!

The dotnet driver now has range so you cant also do select id, 'yoyo' as yo from range(100) and you get one hundred rows, there is also now support for createDataFrame so a couple of awesome features.