Delta Lake over Spark Connect
I have just finished an update for the spark connect dotnet lib that contains the DeltaTable
implementation so that we can now use .NET to maintain delta tables, over and above what we get out of the box by using DataFrame.Write.Format("delta")
, this is an example of how to use the delta api from .NET:
var deltaTable = DeltaTable.ForPath(spark, deltaPath);
deltaTable.History().Show(10, 1000);
deltaTable.Update("id < 10", (Col("id"), Lit(0)));
deltaTable.ToDF().Show(20);
var source = spark.Range( 5).WithColumn("name", Lit("teddy")).Alias("src");
deltaTable
.As("tgt")
.Merge(source, "src.id = tgt.id")
.WithSchemaEvolution()
.WhenNotMatchedInsertAll()
.WhenMatchedUpdateAll()
.Execute(spark);
deltaTable.ToDF().Show(50);
deltaTable.RestoreToVersion(1);
deltaTable.ToDF().Show(50);
deltaTable.Delete(Lit(true));
deltaTable.ToDF().Show(50);
deltaTable.Detail().Show();
var newDeltaTable = DeltaTable
.CreateOrReplace(spark)
.TableName("deltatablefuntest")
.AddColumn(
new DeltaTableColumnBuilder("col_a")
.DataType("int")
.Build()
)
.AddColumn(
new DeltaTableColumnBuilder("col_b")
.DataType("int")
.GeneratedAlwaysAs("1980")
.Nullable(false)
.Build()
)
.Execute();
newDeltaTable.ToDF().Show();
var sourceDataFrame = spark.CreateDataFrame((new List<(int, long)>()
{
(1, 1980), (2, 1980), (3, 1980), (4, 1980), (5, 1980), (6, 1980)
}).Cast<ITuple>(), new StructType((List<StructField>) [new StructField("this_is_cola", new IntegerType(), false), new StructField("colb", new BigIntType(), false)])).Alias("source");
sourceDataFrame.Show();
newDeltaTable.As("target")
.Merge(sourceDataFrame, "source.this_is_cola = target.col_a")
.WhenMatchedUpdate(
(Col("target.col_a"), Col("source.this_is_cola")),
(Col("target.col_b"), Col("source.colb")))
.WhenNotMatchedInsert(
(Col("target.col_a"), Col("source.this_is_cola")),
(Col("target.col_b"), Col("source.colb")))
.WithSchemaEvolution()
.Execute(spark);
newDeltaTable.ToDF().Show();
newDeltaTable.Optimize().ExecuteCompaction();
newDeltaTable.Vacuum();
There isn’t really too much to say, the only thing worth pointing out is how nice the delta extensions work with spark, it was really super easy to implement the functions once I had figured out the first one.
To get this to work I needed to add the proto files from the delta lake project to my local repo and compile, when I did this I found that I had made the original spark proto implementation more complex that it needed to be so now I have ended up with structuring the proto files like this:
Which is much nicer!