October 14th, 2015
Lots of discussion and demand around Jupyter notebooks these days, and no wonder. Since the big the big split the project started to focus more and more how to provide generic interactive notebook-like interface on top of different interactive kernels. Now there's already over 50 kernels available, with support for the most used programming languages, not just only Python anymore.
Jupyter offers nice interactive sessions, as execution history can be diveded into cells, that can be later re-evaluated. With libraries like matplotlib or Bokeh return values can be visualized inline in the notebook. This makes Jupyter noteooks also nice candidate for explorative data analysis. This post is about how to take a step further and integrate Jupyter notebooks with existing data analysis stack, namely Spark and its dataframe environment that can also integrate with your existing databases.
Installation of plain Jupyter is straightforward. The easiest way is just to use your package manager and install Jupyter system-wide. For the sake of environment isolation I'll use virtualenv & virtualenvwrapper here. So the first step is to create the virtual environment for this Jupyter installation with virtualenvwrapper:
mkvirtualenv jupyter
As mentioned in the Jupyter installation documentation, Jupyter installation with pip requires some dependencies to be compiled at least on Ubuntu. Install these dependencies first, and then install Jupyter itself with pip.
apt-get install build-essential python-dev
pip install jupyter
Notebook server can be password protected, note that such server should also has proper SSL setup, especially so if used in an open environment. To generate password hash with Python shell.
from notebook.auth import passwd
passwd()
Next create startup script to launch Jupyter notebook server. Replace IP with the ip from the server interface if external access is required, and insert the hashed password computed above.
#!/usr/bin/env bash
source $WORKON_HOME/ipy/bin/activate
jupyter-notebook --no-browser --ip 127.0.0.1 --NotebookApp.password='you-hashed-password'
Next target is to create SparkContext and submit jobs to Spark cluster. For this to succeed the startup script script above must be modified to include SPARK_HOME environment variable pointing to Spark installatino directory before the notebook is started.
#!/usr/bin/env bash
source $WORKON_HOME/ipy/bin/activate
export SPARK_HOME=/opt/spark
jupyter-notebook --no-browser --ip 127.0.0.1 --NotebookApp.password='you-hashed-password'
Launch new notebook on Jupyter notebook server and create Spark contexts (change correct master argument pointing to master node of current Spark installation)
import os
os.environ['PYSPARK_PYTHON'] = 'python2'
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext(master='spark://spark-frontend-host:7077')
sqlContext = SQLContext(sc)
That's pretty straightforward. To connect to external database to retrieve data into Spark dataframes additional jar file is required. E.g. with MySQL the JDBC driver is required. Download the driver package and extract mysql-connector-java-x.yy.zz-bin.jar in a path that's accessible from every node in the cluster. Preferably this is a path on shared file system. E.g. with Pouta Virtual Cluster such path would be under /shareddata, here I use `/shareddata/thirdparty_jars/`.
With direct Spark job submissions from terminal one can specify --driver-class-path argument
pointing to extra jars that should be provided to workers with the job. However
this does not work with this approach, so we must configure these paths for
front end and worker nodes in the spark-defaults.conf file, usually in
/opt/spark/conf
directory.
spark.driver.extraClassPath /shared_data/thirdparty_jars/mysql-connector-java-5.1.35-bin.jar
spark.executor.extraClassPath /shared_data/thirdparty_jars/mysql-connector-java-5.1.35-bin.jar
Now workers in the cluster should be able to load the MySQL connector and connect to external database. To load that data with Python load method of the SQLContext object is used. The dbtable argument can take either a table name or any subquery that is valid in a FROM clause. This can be handy if only a partial data, matching some condition, is to be loaded.
df = sqlContext.read.format('jdbc').options(url='jdbc:mysql://mysqlserver:3306', dbtable='(select * from someTable limit 100) as myTable').load()
df.head()
Once the Spark context is created the cluster resources are allocated for that context until it is stopped. To do this from the notebook use
sc.stop()