29 Jun 2017

Bundling Python Packages for PySpark Applications

Overview

Apache Spark seems to be the best thing since sliced bread in the data community. It has several API's, each at varying levels of maturity. I've been focusing on the Python API recently, which has been fairly easy to pick up, but has the disadvantage that the only Spark engineer I know is a Scala expert, creating a little bit of a language barrier.

The Spark setup I have at my disposal is Spark 2.0.0 running on YARN on a cluster. Now, one thing you have to take into consideration when you run on a cluster is that any libraries you use have to be available on each node. Two real options exist: 1) install libraries on all nodes in the cluster, or 2) package them up and submit them with your application.

Now option 1 sounds easy, but does come with some caveats: if you have several applications that rely on different versions of the same library, you may encounter headaches maintaining the cluster if the library is not backwards compatible. How about option 2? In the Scala/Java world, you can submit jars or uber jars with your application, (a bundle of all the dependencies) with the caveat that they must be serializable. This may be preferable, as it makes running applications somewhat cleaner.

Well, thats great for Scala and Java, but what about Python?

Distributing Python functions with a PySpark application

I consulted this blog from Cloudera which was pretty helpful for the simple case when you have a single Python script, of which you want to distribute it's contents across the cluster.

You should firstly ensure that PySpark is set up to use the same version of Python as you used to develop your script. I achieve this with the following:

export PYSPARK_PYTHON=<path to python>
export PYSPARK_DRIVER_PYTHON=<path to python>

All you need to do now is use the --py-files argument when you launch your Spark application. Lets quickly create a Python script called myfile.py, containing a test function

def simple_function(x):
  return x * 2

Spark is running on YARN with dynamic resource allocation, so I don't have to worry about specifying memory, cores and execuctors. I can create an application by running PySpark from the CLI:

pyspark --py-files myfile.py

And to test, all I need to do is something like this (in my interactive PySpark application)

def test(x):
    from myfile import simple_function
    return simple_function(x)

# test by mapping function over RDD
rdd = sc.parallelize(range(1,100))
rdd.map(lambda x: test(x)).collect()

We see that the function from myfile.py was accessable when we mapped the function test() across the rdd.

Distributing python packages with a PySpark application

Now the above example is simple, and the Cloudera blog I linked to could have told you all of that. The blog gets a little hazy when it comes to distributing Python packages across the cluster. We are told that we have options. Firstly, you can make friends with your sysadmin and hope they will be able to support various Python environments across the cluster using Anaconda or virtualenv (heavily dependent on actually having someone capable of doing this and with the time to do this). Otherwise, you can distribute a binary of your library in a .egg file when you submit your application.

I was a little displeased with this result. Firstly, eggs are becoming deprecated in favour of wheels. It seems that there is no support yet for wheels.

The next issue is- where do you get the eggs from? Cloudera's blog doesn't detail this at all. Thankfully, it is pretty straightforward, as long as you can get your hands on the package source code.

Lets take beautifulsoup4 as an example (as it isn't included in the Anaconda environments currently installed across the cluster). I downloaded the source code from here, and after unzipping the tar.gz file, I ran the command

python setup.py bdist_egg

to create my egg (see /dist/). Note that I was in a Python 2.7 Anaconda environment on an edge node of the cluster when I did this (the same environment that is available across the cluster). You need to ensure that the architecture of the location where you create the egg is the same as that in the nodes accross the cluster, or you will run into trouble. Thankfully, in my case, the cluster is built on AWS so each of the nodes is identical.

Now it's as simple as running the command

pyspark --py-files <path to egg>

to run the spark application. To prove that we have the package accross the cluster

def test(x):
    import bs4
    return bs4.__version__

# map function over RDD. Will print version
# of bs4 every time called
rdd = sc.parallelize(range(1,100))
rdd.map(lambda x: test(x)).collect()

As we can see, we can access an attribute of the package, demonstrating that it is indeed available across the cluster.

Other thoughts

Well, that wasn't too complex, but as I have found finding clear instructions for such a simple requirement can be challenging sometimes. I'm a little suprised that eggs are still the only way to distribute Python librarys around the nodes in the cluster. Perhaps there is more than one good reason why most serious data engineers seem to work using Scala- Python seems great for data science, but perhaps is not the best choice for engineering.

As a quick comment- what if you are using jupyter for an interactive PySpark session? Check out this post, which got me up and running!

TL;DR- instructions how to make an .egg out of a package to submit with a PySpark application.