Spark Guide
This guide provides a quick peek at Hudi's capabilities using Spark. Using Spark Datasource APIs(both scala and python) and using Spark SQL, we will walk through code snippets that allows you to insert, update, delete and query a Hudi table.
Setup
Hudi works with Spark-2.4.3+ & Spark 3.x versions. You can follow instructions here for setting up Spark.
Spark 3 Support Matrix
Hudi | Supported Spark 3 version |
---|---|
0.14.x | 3.4.x (default build), 3.3.x, 3.2.x, 3.1.x, 3.0.x |
0.13.x | 3.3.x (default build), 3.2.x, 3.1.x |
0.12.x | 3.3.x (default build), 3.2.x, 3.1.x |
0.11.x | 3.2.x (default build, Spark bundle only), 3.1.x |
0.10.x | 3.1.x (default build), 3.0.x |
0.7.0 - 0.9.0 | 3.0.x |
0.6.0 and prior | not supported |
The default build Spark version indicates how we build hudi-spark3-bundle
.
In 0.14.0, we introduced the support for Spark 3.4.x and bring back the support for Spark 3.0.x. In 0.12.0, we introduced the experimental support for Spark 3.3.0. In 0.11.0, there are changes on using Spark bundles, please refer to 0.11.0 release notes for detailed instructions.
Spark Shell/SQL
- Scala
- Python
- Spark SQL
From the extracted directory run spark-shell with Hudi:
# For Spark versions: 3.2 - 3.4
export SPARK_VERSION=3.4
spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark versions: 3.0 - 3.1
export SPARK_VERSION=3.1
spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark version: 2.4
spark-shell --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.14.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
From the extracted directory run pyspark with Hudi:
# For Spark versions: 3.2 - 3.4
export PYSPARK_PYTHON=$(which python3)
export SPARK_VERSION=3.4
pyspark --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark versions: 3.0 - 3.1
export PYSPARK_PYTHON=$(which python3)
export SPARK_VERSION=3.1
pyspark --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark version: 2.4
export PYSPARK_PYTHON=$(which python3)
pyspark --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.14.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
Hudi support using Spark SQL to write and read data with the HoodieSparkSessionExtension sql extension. From the extracted directory run Spark SQL with Hudi:
# For Spark versions: 3.2 - 3.4
export SPARK_VERSION=3.4
spark-sql --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark versions: 3.0 - 3.1
export SPARK_VERSION=3.1
spark-sql --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark version: 2.4
spark-sql --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.14.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
Use scala 2.12 builds with an additional config: --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
Setup project
Below, we do imports and setup the table name and corresponding base path.
- Scala
- Python
- Spark SQL
// spark-shell
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._
val tableName = "trips_table"
val basePath = "file:///tmp/trips_table"
# pyspark
from pyspark.sql.functions import lit, col
tableName = "trips_table"
basePath = "file:///tmp/trips_table"
// Next section will go over create table commands
Create Table
First, let's create a Hudi table. Here, we use a partitioned table for illustration, but Hudi also supports non-partitioned tables.
- Scala
- Python
- Spark SQL
// scala
// First commit will auto-initialize the table, if it did not exist in the specified base path.
# pyspark
# First commit will auto-initialize the table, if it did not exist in the specified base path.
For users who have Spark-Hive integration in their environment, this guide assumes that you have the appropriate settings configured to allow Spark to create tables and register in Hive Metastore.
Here is an example of creating a Hudi table.
-- create a Hudi table that is partitioned.
CREATE TABLE hudi_table (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI
PARTITIONED BY (city);
For more options for creating Hudi tables or if you're running into any issues, please refer to SQL DDL reference guide.
Insert data
- Scala
- Python
- Spark SQL
Generate some new records as a DataFrame and write the DataFrame into a Hudi table. Since, this is the first write, it will also auto-create the table.
// spark-shell
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo" ),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
Hudi provides a wide range of write operations - both batch and incremental - to write data into Hudi tables,
with different semantics and performance. When record keys are not configured (see keys below), bulk_insert
will be chosen as
the write operation, matching the out-of-behavior of Spark's Parquet Datasource.
Generate some new records as a DataFrame and write the DataFrame into a Hudi table. Since, this is the first write, it will also auto-create the table.
# pyspark
columns = ["ts","uuid","rider","driver","fare","city"]
data =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
(1695115999911,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai")]
inserts = spark.createDataFrame(data).toDF(*columns)
hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.partitionpath.field': 'city'
}
inserts.write.format("hudi"). \
options(**hudi_options). \
mode("overwrite"). \
save(basePath)
Hudi provides a wide range of write operations - both batch and incremental - to write data into Hudi tables,
with different semantics and performance. When record keys are not configured (see keys below), bulk_insert
will be chosen as
the write operation, matching the out-of-behavior of Spark's Parquet Datasource.
Users can use 'INSERT INTO' to insert data into a Hudi table. See Insert Into for more advanced options.
INSERT INTO hudi_table
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo' ),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo' ),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai' ),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
If you want to control the Hudi write operation used for the INSERT statement, you can set the following config before issuing the INSERT statement:
-- bulk_insert using INSERT_INTO
SET hoodie.spark.sql.insert.into.operation = 'bulk_insert'