Spark Quick Start
This guide provides a quick peek at Hudi's capabilities using Spark. Using Spark Datasource APIs(both scala and python) and using Spark SQL, we will walk through code snippets that allows you to insert, update, delete and query a Hudi table.
Setup
Hudi works with Spark-2.4.3+ & Spark 3.x versions. You can follow instructions here for setting up Spark.
Spark 3 Support Matrix
Hudi | Supported Spark 3 version |
---|---|
0.15.x | 3.5.x (default build), 3.4.x, 3.3.x, 3.2.x, 3.1.x, 3.0.x |
0.14.x | 3.4.x (default build), 3.3.x, 3.2.x, 3.1.x, 3.0.x |
0.13.x | 3.3.x (default build), 3.2.x, 3.1.x |
0.12.x | 3.3.x (default build), 3.2.x, 3.1.x |
0.11.x | 3.2.x (default build, Spark bundle only), 3.1.x |
0.10.x | 3.1.x (default build), 3.0.x |
0.7.0 - 0.9.0 | 3.0.x |
0.6.0 and prior | not supported |
The default build Spark version indicates how we build hudi-spark3-bundle
.
In 0.15.0, we introduced the support for Spark 3.5.x. In 0.14.0, we introduced the support for Spark 3.4.x and bring back the support for Spark 3.0.x. In 0.12.0, we introduced the experimental support for Spark 3.3.0. In 0.11.0, there are changes on using Spark bundles, please refer to 0.11.0 release notes for detailed instructions.
Spark Shell/SQL
- Scala
- Python
- Spark SQL
From the extracted directory run spark-shell with Hudi:
# For Spark versions: 3.2 - 3.5
export SPARK_VERSION=3.5 # or 3.4, 3.3, 3.2
spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.15.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark versions: 3.0 - 3.1
export SPARK_VERSION=3.1 # or 3.0
spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.15.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark version: 2.4
spark-shell --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.15.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
From the extracted directory run pyspark with Hudi:
# For Spark versions: 3.2 - 3.5
export PYSPARK_PYTHON=$(which python3)
export SPARK_VERSION=3.5 # or 3.4, 3.3, 3.2
pyspark --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.15.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark versions: 3.0 - 3.1
export PYSPARK_PYTHON=$(which python3)
export SPARK_VERSION=3.1 # or 3.0
pyspark --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.15.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark version: 2.4
export PYSPARK_PYTHON=$(which python3)
pyspark --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.15.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
Hudi support using Spark SQL to write and read data with the HoodieSparkSessionExtension sql extension. From the extracted directory run Spark SQL with Hudi:
# For Spark versions: 3.2 - 3.5
export SPARK_VERSION=3.5 # or 3.4, 3.3, 3.2
spark-sql --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.15.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark versions: 3.0 - 3.1
export SPARK_VERSION=3.1 # or 3.0
spark-sql --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.15.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark version: 2.4
spark-sql --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.15.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
Users are recommended to set this config to reduce Kryo serialization overhead
--conf 'spark.kryo.registrator=org.apache.spark.HoodieKryoRegistrar'
Use scala 2.12 builds with an additional config:
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
Setup project
Below, we do imports and setup the table name and corresponding base path.
- Scala
- Python
- Spark SQL
// spark-shell
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._
val tableName = "trips_table"
val basePath = "file:///tmp/trips_table"
# pyspark
from pyspark.sql.functions import lit, col
tableName = "trips_table"
basePath = "file:///tmp/trips_table"
// Next section will go over create table commands