PySpark in Jupyter Notebook — Working with Dataframe & JDBC Data Sources

Chi Thuc Nguyen
3 min readSep 21, 2018

--

Note: this was tested for Spark 2.3.1 on Windows, but it should work for Spark 2.x on every OS. On Linux, please change the path separator from \ to /.

Specifying the driver class

Normally, in order to connect to JDBC data sources (for Sqlite, MySQL or PostgreSQL for examples), we need to include applicable JDBC driver when you submit the application or start shell, like this:

bin\pyspark --packages group:name:version

or specify .jar file:

bin\pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR

With spark shell, for PostgreSQL:

.\bin\spark-shell --packages org.postgresql:postgresql:42.1.1

The driver file will automatically be downloaded if needed into spark’s jars folder

Ivy Default Cache set to: C:\Users\thucnguyen\.ivy2\cache
The jars for the packages stored in: C:\Users\thucnguyen\.ivy2\jars
:: loading settings :: url = jar:file:/D:/projects/bigdata/spark/spark-2.3.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fed48783-3142-4fba-ad61-b4e80ba9b5ee;1.0
confs: [default]
found org.postgresql#postgresql;42.1.1 in central
downloading https://repo1.maven.org/maven2/org/postgresql/postgresql/42.1.1/postgresql-42.1.1.jar ...
[SUCCESSFUL ] org.postgresql#postgresql;42.1.1!postgresql.jar(bundle) (2141ms)

For Jupyter Notebook

If you use Jupyter Notebook, you should set the PYSPARK_SUBMIT_ARGS environment variable, as following:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.1.1 pyspark-shell'

Or even using local driver jar file:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars file:///D:/sqlite-jdbc-3.23.1.jar pyspark-shell'

Where to find suitable drivers

Maven Repository (to obtain required coordinates for --packages select desired version and copy data from a Buildr tab in a form compile-group:name:version) or Maven Central Repository:

Connecting to Database

Prepare JDBC URI and an optional dictionary of JDBC arguments

url = "jdbc:postgresql://localhost/foobar"
properties = {
"driver": "org.postgresql.Driver",
"user": "foo",
"password": "bar"
}

Reading dataframe from Database

Use sqlContext.read.jdbc

df = sqlContext.read.jdbc(url=url,\
table="baz", \
properties=properties)

Or:

df= sqlContext.read.format("jdbc")\
.option("url", url)\
.option("dbtable", tablename)\
.option(**properties)\
.option("driver", driver).load()

By default JDBC data sources loads data sequentially using a single executor thread. To ensure distributed data loading you can:

  • Provide partitioning column (must be IntegeType), lowerBound, upperBound, numPartitions.
  • Provide a list of mutually exclusive predicates predicates, one for each desired partition.

In a distributed mode (with partitioning column or predicates) each executor operates in its own transaction. If the source database is modified at the same time there is no guarantee that the final view will be consistent.

Writing dataframe to Database

Use DataFrame.write.jdbc

mode = ...
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)

Or

mode = ...
df.write.format("jdbc")\
.option("url", url)\
.option("dbtable", tablename)\
.option(**properties)\
.option("driver", driver)\
.load(mode=mode)

Supported writing mode:

  • append: Append contents of this DataFrame to existing data.
  • overwrite: Overwrite existing data.
  • ignore: Silently ignore this operation if data already exists.
  • error (default case): Throw an exception if data already exists.

Full working example: reading data frome Sqlite into a data frame

from pyspark.sql import SparkSessionspark = SparkSession.builder \
.master("local") \
.appName("jdbc data sources") \
.config("spark.sql.shuffle.partitions", "4") \
.getOrCreate()
driver = "org.sqlite.JDBC"
path = "data/flight-data/jdbc/my-sqlite.db"
url = "jdbc:sqlite:" + path
tablename = "flight_info"
dbDataFrame = spark.read.format("jdbc").option("url", url)\
.option("dbtable", tablename).option("driver", driver).load()
dbDataFrame.show()

Known errors

Class Not Found

Missing driver package (see above to know how to specify a drive class or local jar file)

java.lang.ClassNotFoundException: org.sqlite.JDBC

No suitable driver

java.sql.SQLException: No suitable driver

This means there is no driver version mismatch to solve this you can add driver class to the properties. For example:

properties = {
...
"driver": "org.sqlite.JDBC"
}

Strange behavior with 'overwrite' mode while writing to JDBC data sources

During our testing, we found out that the 'overwrite' mode while writing to JDBC data source does NOT work as expected. Actual data are NOT written to the databases, instead we will have EMPTY table after writing. And the behavior is even unstable, in our case for MySQL, the dataframe turns empty after writing as well.

Finally, we come up with follow solution, that works for both Sqlite and MySQL: we need to cache the dataframe right after reading it from data source, and only after that, perform any transformations and finally write to the destination JDBC data source.

sqlContext.read.format('jdbc').option.....load(**kargs).cache()

--

--