Apache Spark from PHP - it is not just a .NET thing

All Spark Connect Posts

Code

Goal of this post

I wanted to explore what the Spark Connect API looked like from other languages, I am not a php developer - I used it a long time ago and read up on some of the modern changes but apologies if I insult any php-ers! I will say that I quite like php.

Setup

The instructions are from https://grpc.io/docs/languages/php/quickstart/ with an extra step. There are a load of dependencies that we will need in the example repo so I thought the simplest way to get this running was to get the current php gRPC example running and then add my code as an extra example in-place and then use the existing dependencies to build and run this code.

git clone --recurse-submodules -b v1.60.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc
cd grpc
git remote add -f sparkphp https://github.com/GoEddie/-spark-connect-blogs-php.git
git subtree add --prefix examples/php-spark sparkphp main --squash

We now need to make the dependencies in the grpc repo, the instructions are in the BUILDING.md file in the repo. I was on a mac with xtools already installed so needed to do:

brew install composer
brew install bazel

and then:

LIBTOOL=glibtool LIBTOOLIZE=glibtoolize make

when that was finished I could then build and run the php example:

bazel build :all
bazel build @com_google_protobuf//:protoc //src/compiler:all

If that succeeds then run:

examples/php/greeter_proto_gen.sh
cd examples/php
composer install

The php example is a client, we will need a server to respond to our client. Php doesn’t have a gRPC server but node has a simple one we can use:

https://grpc.io/docs/languages/node/quickstart/

If you change to a new directory outside of the grpc repo and run:

# Clone the repository to get the example code
git clone -b @grpc/grpc-js@1.9.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc-node
# Navigate to the node example
cd grpc-node/examples
# Install the example's dependencies
npm install
# Navigate to the dynamic codegen "hello, world" Node example:
cd helloworld/dynamic_codegen
node greeter_server.js

(You will need node js installed).

Then go back to the examples/php folder in the grpc repo:

./run_greeter_client.sh

Should result in the words “Hello World” being printed to the console, this is good and means that you have gRPC running and calling a server from php. Close the node terminal using ctrl+c and then we will go to our spark example.

TIP: Please make sure this works before you continue, I had an error with the grpc module not being found and so found where it was built and where it was being looked for and copied it into the right place (see https://stackoverflow.com/questions/67896868/grpc-installation-unable-to-load-dynamic-library-grpc-so)

cd ../php-spark

Now create the php files from the spark proto definitions:

 ./spark_connect_proto_gen.sh

Now run composer:

composer install

Check that you have a spark connect server running:

$SPARK_HOME/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.0  io.delta:delta-spark_2.12:3.0.0

(Make sure you have SPARK_HOME set correctly etc)

and run the spark client:

./run_spark_client.sh

Which should see some output and in amongst some random junk:

+---+
|id |
+---+
|0  |
|1  |
|2  |
|3  |
|4  |
|5  |
|6  |
|7  |
|8  |
|9  |
+---+

The Code

So if that is running lets take a look at the code. In the Spark\Connect folder we get the generated code from the proto files:

Proto Project

I thought it would be a good idea to start wrapping the client connection bits in a SparkSession:

class SparkSession{

    private $client;
    private $session_id;

    public function __construct(){

        $hostname = 'localhost:15002';

        $this->client = new Spark\Connect\SparkConnectServiceClient($hostname, [
            'credentials' => Grpc\ChannelCredentials::createInsecure(),
        ]);

        $this->session_id = $this->generateGUID();
    }

}

and we will need a way to execute plans:

public function executePlan($plan){

        $request = (new Spark\Connect\ExecutePlanRequest())
                       ->setSessionId($this->session_id)
                       ->setPlan($plan)
                       ->setClientType('SPARK_CONNECT_PHP');

        $response  = $this
                       ->client
                       ->ExecutePlan($request);


        echo "executed." . PHP_EOL;
        
        foreach($response->responses() as $current) {
        
            echo $current->getSessionId() . '::' . $current->getResponseId() . '::' . $current->getOperationId() . '::' . $current->getResponseType() . PHP_EOL;
            
            if($current->getResponseType() == 'arrow_batch'){
                $batch = $current->getArrowBatch();
                echo 'ARROW BATCH:' . PHP_EOL;
                echo $batch->serializeToString() . PHP_EOL;
            }
        }
    }

Apache Arrow

There isn’t an Apache Arrow implementation for php, there is one for C so if someone wanted to they could write a php extension or a service in another language to handle it (or write a php implementation) but I didn’t feel like I wanted to do that but I did want to see the output from the .show() so I just use serializeToString which prints a representation of the output which sort of works even if it isn’t perfect.

To use php I think this is definitely something that will need to be solved.

Back to the code

In our demo we will call spark.range()

public function range(int $end){
        $range = (new Spark\Connect\Range())
                    ->setEnd($end)
                    ->setStep(1)
                    ->setStart(0);

        $relation = (new Spark\Connect\Relation())
                        ->setRange($range);

        return new DataFrame($relation, $this);
    }

We will need something to hold the reference to the relation so a simple DataFrame class looks like:

class DataFrame{
    
    public function __construct(private Spark\Connect\Relation $relation, private SparkSession $session){
       
    }

    public function show(int $rows){

        $showString = (new Spark\Connect\Relation())
                            -> setShowString((new Spark\Connect\ShowString())
                                                    ->setNumRows(10)
                                                    ->setInput($this->relation));

        $plan = (new Spark\Connect\Plan())
                     ->setRoot($showString);

        $this->session->executePlan($plan);
        
        $this->write = new DataFrameWriter($this->relation, $this->session);
    }

    public $write;

}

with a DataFrameWriter that is accessible from the write property so it is consistent with the Scala and Python API dataframe.write.blah:

class DataFrameWriter{
    
    public function __construct(private Spark\Connect\Relation $relation, private SparkSession $session){
    }

    public function parquet($path){

        $write = (new Spark\Connect\WriteOperation())
                    -> setInput($this->relation)
                    -> setPath($path)
                    -> setSource('parquet')
                    -> setMode(2);

        $writeCommand = (new Spark\Connect\Command())
                            -> setWriteOperation($write);

        $plan = (new Spark\Connect\Plan())
                    ->setCommand($writeCommand);

        $this->session->executePlan($plan);
    }
}

Running the code then becomes:

$session = new SparkSession();
$dataFrame = $session->range(100);
$dataFrame->show(50);
$dataFrame->write->parquet('/tmp/php-parquet');

which can be run using the run_spark_client.sh script which runs the php code with the gRPC extension:

php -d extension=grpc.so -d max_execution_time=300 spark_client.php

There you are, running Apache Spark jobs from php (ignoring the Apache Arrow bits!) and just to prove this isn’t some autogenerated gpt nonsense:

Spark output from php