How do we test ETL pipelines? Part one unit tests

Why do we bother testing?

Testing isn’t an easy thing to define, we all know we should do it, when something goes wrong in production people shout and ask where the tests were, hell even auditors like to see evidence of tests (whether or not they are good isn’t generally part of an audit) . What do we test, how and why do we even write tests? It is all well and good saying “write unit tests and integration tests” but what do we test? We are writing ETL pipelines, the people who run the source system can’t even tell us if the files are in CSV, JSON, or double-dutch – they certainly don’t have a schema that persists more than ten minutes.

I’ve been testing ETL pipelines (amongst other things) seriously for over a decade and, for me, the thing that testing gives me isn’t fewer bugs (that does happen but isn’t the primary goal) the result it gives me is the ability to move faster thereby providing business value quicker. Having a good suite of tests will:

  • Give you living examples of how your code should be used
  • Give you living examples of the way data sources and data destinations are meant to look (this is so key in ETL where source systems often change more often than the hands on a clock)
  • Ensures that changes to code or configuration don’t change the way the pipeline has been written
  • Proves that the code I write ends up being deployed, un-impeded by build and deploy processes.
  • When bugs do occur, allow me to find and fix bugs faster

Quick note about production

We test in production, this means we have monitoring and do things like have phased roll-outs using feature flags, or we roll-out to select customers first, prove it then roll it out to everyone else. Testing in production doesn’t mean hacking around getting some process to work. We don’t test “on production” (hacking), we test “in production” – while we are in production we are continually testing, and if anything goes wrong, we have alerts and can deal with it.

Type of testing we can do

These are the definitions I use, not everyone uses the same definitions. Generally, tests fall into this pattern although they are flexible and the more you have in each environment, the better. For example, on your dev machine, you will likely run unit tests as well as integration tests and the events you monitor for in production you also monitor for locally but probably with an assert rather than an ELK stack. You will also likely run your integration tests on your CI server but less likely in production.

The general principle is that on the left we have the fastest running, lowest complexity but also least likely to give us confidence anything in production works but as we move further to the right, we get more and more confidence that everything is good. The ultimate test for code is when it is in production, and we have monitoring to show that everything is good and we can check-in and deploy anytime (even on a Friday):

Testing ETL Processes – Types of test we need to consider(https://the.agilesql.club/assets/images/etl/test-type-names.png)

In this post, we will talk about unit testing, later posts will discuss integration testing (the excitement is strong!)

A quick note on language

My background is as a Microsoft SQL Server developer (this blog is mostly SQL based), but ETL with MS SQL Server is typically SSIS, and I have two problems with SSIS, one it is hard to unit test (forget xSSIS Unit) and two there are so many stupid dialogs it is really infuriating. So writing a post about unit testing SSIS typically goes, you can’t unit test SSIS - have a read of this, and it should become clearer why (you need to split out business logic and SSIS munges all the code together in one massive XML turd).

The examples here are in python 3 targeting Spark but please follow along because the principles are the same for any dev work (I promise, I have used these in C, C++, C#, Go, TypeScript, T-SQL (yes really!), python, scala, even SSIS)

Unit Testing ETL Pipelines

Unit tests are small tests that, typically, test business logic. A unit test checks that a line of code or set of lines of code do one thing. They don’t prove whether a pipeline works, not even close but that is fine – we have other tests for that. Typically, what I would like to see from unit tests for an ETL pipeline is the business logic which normally sits in the “T” phase but can reside anywhere. If we consider this dataset as our source dataset:

col_a col_b col_c col_d
100 ‘y’ true null
200 ‘z’ true null

The key to unit testing is splitting the business logic up from the “plumbing” code, for example, if we are writing python for Apache Spark and we wanted to read in this text file and then save just rows with a ‘z’ in “col_b” we could do this:


def main(inputFile, outputFile):

    print(strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime()))

    spark = SparkSession.builder.master('local').appName('Cool ETL Pipeline').getOrCreate()

    df = spark.read.option('header', True).csv(inputFile)

    input = df.filter("col_b == 'z'")

    input.show()

    input.write.parquet(outputFile, mode='overwrite')

    print(strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime()))

This reads an input file, filters out rows that don’t match col_b == 'z' and then writes the data back out in parquet. If we look at this, the business logic is the col_b == 'z' bit, everything else is, what I would call, “plumbing” code. We will test the plumbing bits later on with integration tests, but for unit tests, all we care about is whether the col_b == 'z' is accurate, well documented and is maintainable.

To test this string col_db == 'z' is quite hard with unit tests, also as an aside, there is no intellisense for blocks of random SQL in strings and therefore also not useful things like refactoring support or compile-time verification, it is just a string.

When we approach unit testing, we have some choices:

  1. Write a test that runs the whole function, this would require creating a physical “input.csv” and checking the “output” parquet file.
  2. Move the string into a global (or somehow shared) constant and have a test that runs those specific strings
  3. Separate the business logic from the plumbing code to be able to test it

In case you were unsure, the correct answer is 3 :)

With Spark we can include strings of SQL or we can write the code in python, scala, .net, etc. What we should do then is to create a new file which we import with the filter in that we can either call from our spark code, or from unit tests:

def is_z_record(col_b):

    return "z" == col_b

Then, in our main function, we import this file and call it, we also need to register it as a UDF in spark:

from filter import is_z_record

def main(inputFile, outputFile):

    print(strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime()))

    spark = SparkSession.builder.master('local').appName('Cool ETL Pipeline').getOrCreate()

    filter = udf(is_z_record, BooleanType())

    df = spark.read.option('header', True).csv(inputFile)

    input = df.filter(filter(df['col_b']))

    input.show()

    input.write.parquet(outputFile, mode='overwrite')

    print(strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime()))

We can also add some unit tests:

from filter import is_z_record

def test_filter_spark_data_frame():
    assert is_z_record('z')

def test_filter_excludes_non_z_records():
    assert not is_z_record('x')

def test_filter_excludes_rows_with_z_anywhere_in_col_b():
    assert not is_z_record('.z.')

Now, if in a year’s time we want to know how the filtering works on this pipeline, we can open the tests and quite confidently say “we include any ‘z’ rows, exclude any non ‘z’ rows, including any rows that include ‘z’ - WOW, what a win to your future self or other developers.

Cry, Cry, Cry, but what about performance?

We are going to take a quick break from looking at unit testing to talk about performance. There is a real tendency when programming to try to predict where things will be slow, this is especially bad if you have experience of fixing performance issues. You see, in this small app, we have taken an embedded string and converted it to a UDF. A UDF in python is slower because the data is passed from the Spark Java VM process via a socket to a python process, that does it’s bit then returns the value. If you have a gazillion rows, then this will be slightly slower. To make testing easier, we have made the pipeline slower! OMG, this is terrible.

Actually, it probably isn’t. The thing is you need to be realistic about this, how long does it take to read the data? If it takes your process from ten minutes to ten minutes thirty seconds, does it matter? If it does matter if it really does matter then put it back. If you consider that 99.99999999% of the time, having a better unit test is better than having slightly faster performance then always go with the easier testing than with better performance and for the remaining cases, make the decision to sacrifice testing for that last little squeeze of performance.

I once worked on a c# .net app that had to be fast, real fast, so I wrote it without using any virtual methods (as virtual methods are slightly slower / I had issues previously and optimised prematurely). This meant that unit testing was hard (you couldn’t mock non-virtual methods in .net at the time using Moq) and I ended up using a framework from Microsoft which was called Moles which was, quite frankly, horrible. It kept causing us issues with development until someone else ripped it out and started using virtual methods, then, before I knew it, we had sane unit testing and performance was not noticeably different. It was a real win for everyone to go with the slightly slower approach.

Back to the testing

Now, the business owner comes along and says that you need to extend the functionality so that when “col_d” is the string “null” (confusing right? This is the sort of nonsense we have to put up with!) then we should replace the “null” string with a concat of “col_a” and “col_b”. One approach might be to save the rows with a “null” to a table so that it can be queried with Spark SQL and then union’d with the original data (filtered to exclude nulls):


    spark = SparkSession.builder.master('local').appName('Cool ETL Pipeline').getOrCreate()

    filter = udf(is_z_record, BooleanType())

    df = spark.read.option('header', True).csv(inputFile)

    input = df.filter(filter(df['col_b']))

    #  In spark we can take a data frame and save it so it can be queried with SQL, awesome!
    #  I filtered it so we don't need a where in our SQL, we'll union the other rows back in later                
    input.filter("col_b == 'z'").createOrReplaceTempView("zrows")

    #                                                || = concat 
    zRows = spark.sql("select col_a, col_b, col_c, col_a || col_b col_d from zrows")

    zRows.union(df.filter(~filter(df['col_b']))).write.parquet(outputFile, mode='overwrite')

Now, here again, we have a block of SQL that is hard to test, can’t easily be refactored (things, like go to definition, are useless etc.) so let’s think about how to make it more testable. What do we want? Well, we want to test the business logic, and the business logic here is now something like:

  • Filter the data by ‘z’ rows and a separate filter for ’null’ in ‘col_d’
  • Concat col_a and col_b where col_d is null
  • Create one dataset with all the rows

So we add a new filter:


def is_z_record(col_b):
    return "z" == col_b

def is_null_record(col_d):
    return "null" == col_d

Then register the UDF again (I know, worry about performance later if it is a problem):

spark = SparkSession.builder.master('local').appName('Cool ETL Pipeline').getOrCreate()

    z_filter = udf(is_z_record, BooleanType())

    null_filter = udf(is_null_record, BooleanType())

    df = spark.read.option('header', True).csv(inputFile)

    input = df.filter(z_filter(df['col_b']))

    #  In spark we can take a data frame and save it so it can be queried with SQL, awesome!
    #  I filtered it so we don't need a where in our SQL, we'll union the other rows back in later                
    input.filter(null_filter(df['col_d'])).createOrReplaceTempView("nullrows")

    #                                                || = concat 
    zRows = spark.sql("select col_a, col_b, col_c, col_a || col_b col_d from nullrows")

    zRows.union(df.filter(~null_filter(df['col_d']))).write.parquet(outputFile, mode='overwrite')

So, we can have tests for our new filter and we tidy up our existing tests because “filter” is now ambiguous:

from filter import is_z_record, is_null_record

def test_z_filter_includes_z_records():
    assert is_z_record('z')

def test_z_filter_excludes_non_z_records():
    assert not is_z_record('x')

def test_z_filter_excludes_rows_with_z_anywhere_in():
    assert not is_z_record('.z.')

def test_null_filter_null_records():
    assert is_null_record('null')

def test_null_filter_excludes_non_null_records():
    assert not is_null_record('something else')

def test_null_filter_excludes_rows_with_null_anywhere_in():
    assert not is_null_record('nullable')

So, we now have some cool documentation to tell us what the new filter does, nice. We are testing the logic around filtering but we aren’t testing the concat at all, what if, horror of horrors we are concat’ing col_b and col_d and not col_a and col_b? We could resort to another udf, but lets assume this part is really slow and we need to speed it up so the options with Apache Spark are either to do what we have done in the past i.e. shoved in a blob of SQL or we can write it in pure python using calls to spark and, after encapsulating that business logic, at least make sure the right methods were called.

What does “make sure the right methods were called” actually mean and give us?

If you picture a method or function, there are typically two types:

  • Ones that take some variables (or not) and return a value, they don’t call anything else, they are the masters of everything they do
  • Ones that take some parameters (or not) and call other function(s) which do other things

The first ones are easy to test, like our filters - we pass in some known parameter and check what the response is - these are ideal and the more of these you have the better. The second type are slightly harder but testable in a different way. I can explain better if we move our logic into its own function:

    def process_pipeline(self, df):

        zeds = self.pipeline_filters.filter_to_z_rows(df, df.col_b)
        
        null_zeds = self.pipeline_filters.filter_to_null_rows(zeds, zeds.col_d )
        
        non_null_zeds = self.pipeline_filters.filter_to_non_null_rows(zeds, zeds.col_d)
        
        no_longer_null_zeds = null_zeds.withColumn("col_d", concat(null_zeds.col_a, null_zeds.col_b))
        
        final_dat = non_null_zeds.union(no_longer_null_zeds)
  
        return final_data

We now have our business logic in one place, take the ‘z’ rows, then from those rows split the “null” and non-“null” “col_d”’s and for the “null” col_d’s replace “col_d” with a concat of “col_a” and “col_b”, finally it returns the unioned data so we include all the rows. How you actually implement this is up to you, there are a million ways, this is to demonstrate unit testing and approaches to unit testing.

The first thing to note is that we could just write a test that actually calls Spark but it takes a while to start up and we want unit tests to be fast (forget the premature optimization bit for this ha ha, my blog, my prerogative to pick and choose whatever and whenever I want!)

The second thing is that the unit tests aren’t about proving the whole process works, that is for later, the unit tests allow us to see how this was implemented and what the hell the developer was thinking. The important things here are that:

  • The original data is filtered to just rows with ‘z’ in col_b
  • We store just the ‘z’ records that have a null col_d
  • We, separately, store the ‘z’ records that don’t have a null col_d
  • withColumn is called to create a new “col_d” with the output from concat
  • Finally the two intermediate data frames are union’d

Now, remember, we could just call spark and ask it to do all this processing and spit out the results. Now, however, is not the time - running spark is time-consuming and wasteful for developers making quick changes - that will come later, for now we want to mock out all external dependencies.

If we take the business logic that we have, write it in natural English like above, we can start to define the actual tests:

    def test_data_set_was_filtered_to_just_zeds_in_col_b(self):
        TestPipeline.filters.filter_to_z_rows.assert_called_with(TestPipeline.data_frame, TestPipeline.data_frame.col_b)

    # The 'z' records were then filtered again into a set of null's...
    def test_zeds_were_filtered_to_nulls_in_col_d(self):
        TestPipeline.filters.filter_to_null_rows.assert_called_with(TestPipeline.data_frame_just_zeds, TestPipeline.data_frame_just_zeds.col_d)

    # ... and a set of non-nulls (both using col_d)
    def test_zeds_were_filtered_to_non_nulls_in_col_d(self):
        TestPipeline.filters.filter_to_non_null_rows.assert_called_with(TestPipeline.data_frame_just_zeds, TestPipeline.data_frame_just_zeds.col_d)

    # the null records, had withColumn called passing in the new column name (col_d) and the result of from concat (we mocked it so didn't have to start spark)
    def test_null_zeds_got_new_col_d_with_result_of_concat(self):
        TestPipeline.data_frame_null_zeds.withColumn.assert_called_with("col_d", 'concat_was_called')

    # was concat called wth col_a and col_b from the null data set?
    def test_concat_was_called_with_col_a_and_col_b_from_null_zeds_data_frame(self):
        TestPipeline.mock_concat.assert_called_with(TestPipeline.filters.filter_to_null_rows().col_a, TestPipeline.filters.filter_to_null_rows().col_b)

    def test_non_null_records_and_previously_null_records_but_now_also_non_null_are_unioned_to_create_one_big_dataset_again(self):
        TestPipeline.filters.filter_to_non_null_rows().union.assert_called()

Now if anyone asks “what is the business logic behind this” we can look at the code and the tests and have a pretty good idea how the code works. The key thing here is to keep each test small, what I have done in this example is to create a test class that creates the mocks etc and sets up the code ready for testing, just once, then moved the asserts to individual tests. A fixture probably would have been better but this works for now to get us going - remember when you have tests you can refactor the code (production code and tests!), it doesn’t have to be perfect:


class TestPipeline:

    with patch('src.pipeline.concat', return_value = 'concat_was_called') as mock_concat:

        # Setup Mocks
        filters = mock.Mock()
        data_frame = mock.Mock()

        data_frame.col_b.return_value = 'col_b'
        
        data_frame_just_zeds = mock.Mock()
        data_frame_null_zeds = mock.Mock()
        data_frame_non_null_zeds = mock.Mock()
        data_frame_zeds_with_new_column = mock.Mock()

        filters.filter_to_z_rows.return_value = data_frame_just_zeds
        filters.filter_to_null_rows.return_value = data_frame_null_zeds
        filters.filter_to_non_null_rows = data_frame_non_null_zeds

        data_frame_null_zeds.withColumn.return_value = data_frame_zeds_with_new_column

        pipeline = Pipeline(filters)
        
        #  Execute Code Under Test
        pipeline.process_pipeline(data_frame)

    #The original data frame was filtered to just 'z' records using 'col_b'
    def test_data_set_was_filtered_to_just_zeds_in_col_b(self):
        TestPipeline.filters.filter_to_z_rows.assert_called_with(TestPipeline.data_frame, TestPipeline.data_frame.col_b)

    # The 'z' records were then filtered again into a set of null's...
    def test_zeds_were_filtered_to_nulls_in_col_d(self):
        TestPipeline.filters.filter_to_null_rows.assert_called_with(TestPipeline.data_frame_just_zeds, TestPipeline.data_frame_just_zeds.col_d)

    # ... and a set of non-nulls (both using col_d)
    def test_zeds_were_filtered_to_non_nulls_in_col_d(self):
        TestPipeline.filters.filter_to_non_null_rows.assert_called_with(TestPipeline.data_frame_just_zeds, TestPipeline.data_frame_just_zeds.col_d)

    # the null records, had withColumn called passing in the new column name (col_d) and the result of from concat (we mocked it so didn't have to start spark)
    def test_null_zeds_got_new_col_d_with_result_of_concat(self):
        TestPipeline.data_frame_null_zeds.withColumn.assert_called_with("col_d", 'concat_was_called')

    # was concat called wth col_a and col_b from the null data set?
    def test_concat_was_called_with_col_a_and_col_b_from_null_zeds_data_frame(self):
        TestPipeline.mock_concat.assert_called_with(TestPipeline.filters.filter_to_null_rows().col_a, TestPipeline.filters.filter_to_null_rows().col_b)

    def test_non_null_records_and_previously_null_records_but_now_also_non_null_are_unioned_to_create_one_big_dataset_again(self):
        TestPipeline.filters.filter_to_non_null_rows().union.assert_called()

But, the pipeline doesn’t work!

So we have our business logic separated from our “plumbing” code and we have a set of tests for the business logic, what about unit tests for the rest of it? How do we know it actually works? We don’t.

Unit tests don’t tell us that the whole pipeline works, for that we need integration tests.

But consider this, the feeling of being able to look at your tests, write more tests, get the code working is so satisfying, as a developer I wouldn’t want to work any other way. (Side note: someone will say “but, what if you write your tests wrong” - from experience I can tell you that sometimes you have bugs in your tests and it is annoying but I would rather that than not have any tests, just in case you might have bugs in your tests. Not writing unit tests is really short-sighted)

I will leave you with a story about how I worked on a project where writing unit tests caused the delivery to be roughly on time, if we hadn’t of had tests we would have really struggled. We had a fixed delivery deadline and changing requirements and the organisation insisted on a waterfuall approach. The deadline was genuinely fixed and could not be moved. Our task was to write T-SQL rules, a rule was fairly simple such as checking how a columns for valid post codes (zip codes). Each rule may have had one or more tests, the rules were named Rule_1 to Rule_52 etc and because we had different tests for each rule we might have Rule_1_A, Rule_2_B etc. The cyrpic nature did not help in the slightest.

The approach we took was to write a set of tests for each rule and test, the tests would be in the format Rule_1_a_Postocode_Is_6_or_7_digits(), Rule_1_a_Postcode_Begins_With_Two_Letters_In_Specific_Set(), Rule_1_a_Postcode_Digits_3_4_are_numeric_when_length_6(), (), Rule_1_a_Postcode_Digits_3_4_5_are_numeric_when_length_7(), etc. you get the picture.

Any developer might have written a rule so there was no code ownership or expert in one area.

The business analysts would, on a daily basis, change the rules or consult us as to how the tests worked and any free developer was able to discuss how we implemented any rule by looking at just the test names - this saved the project I am 100% sure of this.

Of course we had bugs but when we did we added more unit tests and in the end the rules themselves were pretty solid.

Unit testing here didn’t help us find more bugs but helped us deal with constantly changing requirements.