PySpark in Jupyter Notebook — Working with Dataframe & JDBC Data Sources
Note: this was tested for Spark 2.3.1 on Windows, but it should work for Spark 2.x on every OS. On Linux, please change the path separator from \
to /
.
Specifying the driver class
Normally, in order to connect to JDBC data sources (for Sqlite, MySQL or PostgreSQL for examples), we need to include applicable JDBC driver when you submit the application or start shell, like this:
bin\pyspark --packages group:name:version
or specify .jar
file:
bin\pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
With spark shell, for PostgreSQL:
.\bin\spark-shell --packages org.postgresql:postgresql:42.1.1
The driver file will automatically be downloaded if needed into spark’s jars
folder
Ivy Default Cache set to: C:\Users\thucnguyen\.ivy2\cache
The jars for the packages stored in: C:\Users\thucnguyen\.ivy2\jars
:: loading settings :: url = jar:file:/D:/projects/bigdata/spark/spark-2.3.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fed48783-3142-4fba-ad61-b4e80ba9b5ee;1.0
confs: [default]
found org.postgresql#postgresql;42.1.1 in central
downloading https://repo1.maven.org/maven2/org/postgresql/postgresql/42.1.1/postgresql-42.1.1.jar ...
[SUCCESSFUL ] org.postgresql#postgresql;42.1.1!postgresql.jar(bundle) (2141ms)
For Jupyter Notebook
If you use Jupyter Notebook, you should set the PYSPARK_SUBMIT_ARGS
environment variable, as following:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.1.1 pyspark-shell'
Or even using local driver jar
file:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars file:///D:/sqlite-jdbc-3.23.1.jar pyspark-shell'
Where to find suitable drivers
Maven Repository (to obtain required coordinates for --packages
select desired version and copy data from a Buildr tab in a form compile-group:name:version
) or Maven Central Repository:
Connecting to Database
Prepare JDBC URI and an optional dictionary of JDBC arguments
url = "jdbc:postgresql://localhost/foobar"
properties = {
"driver": "org.postgresql.Driver",
"user": "foo",
"password": "bar"
}
Reading dataframe from Database
Use sqlContext.read.jdbc
df = sqlContext.read.jdbc(url=url,\
table="baz", \
properties=properties)
Or:
df= sqlContext.read.format("jdbc")\
.option("url", url)\
.option("dbtable", tablename)\
.option(**properties)\
.option("driver", driver).load()
By default JDBC data sources loads data sequentially using a single executor thread. To ensure distributed data loading you can:
- Provide partitioning
column
(must beIntegeType
),lowerBound
,upperBound
,numPartitions
. - Provide a list of mutually exclusive predicates
predicates
, one for each desired partition.
In a distributed mode (with partitioning column or predicates) each executor operates in its own transaction. If the source database is modified at the same time there is no guarantee that the final view will be consistent.
Writing dataframe to Database
Use DataFrame.write.jdbc
mode = ...
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
Or
mode = ...
df.write.format("jdbc")\
.option("url", url)\
.option("dbtable", tablename)\
.option(**properties)\
.option("driver", driver)\
.load(mode=mode)
Supported writing mode:
append
: Append contents of thisDataFrame
to existing data.overwrite
: Overwrite existing data.ignore
: Silently ignore this operation if data already exists.error
(default case): Throw an exception if data already exists.
Full working example: reading data frome Sqlite into a data frame
from pyspark.sql import SparkSessionspark = SparkSession.builder \
.master("local") \
.appName("jdbc data sources") \
.config("spark.sql.shuffle.partitions", "4") \
.getOrCreate()driver = "org.sqlite.JDBC"
path = "data/flight-data/jdbc/my-sqlite.db"
url = "jdbc:sqlite:" + path
tablename = "flight_info"dbDataFrame = spark.read.format("jdbc").option("url", url)\
.option("dbtable", tablename).option("driver", driver).load()dbDataFrame.show()
Known errors
Class Not Found
Missing driver package (see above to know how to specify a drive class or local jar file)
java.lang.ClassNotFoundException: org.sqlite.JDBC
No suitable driver
java.sql.SQLException: No suitable driver
This means there is no driver version mismatch to solve this you can add driver
class to the properties
. For example:
properties = {
...
"driver": "org.sqlite.JDBC"
}
Strange behavior with 'overwrite'
mode while writing to JDBC data sources
During our testing, we found out that the 'overwrite'
mode while writing to JDBC data source does NOT work as expected. Actual data are NOT written to the databases, instead we will have EMPTY table after writing. And the behavior is even unstable, in our case for MySQL, the dataframe turns empty after writing as well.
Finally, we come up with follow solution, that works for both Sqlite and MySQL: we need to cache the dataframe right after reading it from data source, and only after that, perform any transformations and finally write to the destination JDBC data source.
sqlContext.read.format('jdbc').option.....load(**kargs).cache()