Skip to main content
Version: Current

Batch Writes

Spark DataSource API

The hudi-spark module offers the DataSource API to write a Spark DataFrame into a Hudi table.

There are a number of options available:

HoodieWriteConfig:

TABLE_NAME

DataSourceWriteOptions:

RECORDKEY_FIELD: Primary key field(s). Record keys uniquely identify a record/row within each partition. If one wants to have a global uniqueness, there are two options. You could either make the dataset non-partitioned, or, you can leverage Global indexes to ensure record keys are unique irrespective of the partition path. Record keys can either be a single column or refer to multiple columns. KEYGENERATOR_CLASS_OPT_KEY property should be set accordingly based on whether it is a simple or complex key. For eg: "col1" for simple field, "col1,col2,col3,etc" for complex field. Nested fields can be specified using the dot notation eg: a.b.c.
Default value: "uuid"

PARTITIONPATH_FIELD: Columns to be used for partitioning the table. To prevent partitioning, provide empty string as value eg: "". Specify partitioning/no partitioning using KEYGENERATOR_CLASS_OPT_KEY. If partition path needs to be url encoded, you can set URL_ENCODE_PARTITIONING_OPT_KEY. If synchronizing to hive, also specify using HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.
Default value: "partitionpath"

PRECOMBINE_FIELD: When two records within the same batch have the same key value, the record with the largest value from the field specified will be choosen. If you are using default payload of OverwriteWithLatestAvroPayload for HoodieRecordPayload (WRITE_PAYLOAD_CLASS), an incoming record will always takes precendence compared to the one in storage ignoring this PRECOMBINE_FIELD_OPT_KEY.
Default value: "ts"

OPERATION: The write operations to use.
Available values:
"upsert" (default), "bulk_insert", "insert", "delete"

TABLE_TYPE: The type of table to write to. Note: After the initial creation of a table, this value must stay consistent when writing to (updating) the table using the Spark SaveMode.Append mode.
Available values:
COW_TABLE_TYPE_OPT_VAL (default), MOR_TABLE_TYPE_OPT_VAL

KEYGENERATOR_CLASS_NAME: Refer to Key Generation section below.

Example: Upsert a DataFrame, specifying the necessary field names for recordKey => _row_key, partitionPath => partition, and precombineKey => timestamp

inputDF.write()
.format("hudi")
.options(clientOpts) //Where clientOpts is of type Map[String, String]. clientOpts can include any other options necessary.
.option("hoodie.datasource.write.recordkey.field", "_row_key")
.option("hoodie.datasource.write.partitionpath.field", "partition")
.option("hoodie.datasource.write.precombine.field"(), "timestamp")
.option("hoodie.table.name", tableName)
.mode(SaveMode.Append)
.save(basePath);

Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.

// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
mode(Overwrite).
save(basePath)
info

mode(Overwrite) overwrites and recreates the table if it already exists. You can check the data generated under /tmp/hudi_trips_cow/<region>/<country>/<city>/. We provided a record key (uuid in schema), partition field (region/country/city) and combine logic (ts in schema) to ensure trip records are unique within each partition. For more info, refer to Modeling data stored in Hudi and for info on ways to ingest data into Hudi, refer to Writing Hudi Tables. Here we are using the default write operation : upsert. If you have a workload without updates, you can also issue insert or bulk_insert operations which could be faster. To know more, refer to Write operations

Checkout https://hudi.apache.org/blog/2021/02/13/hudi-key-generators for various key generator options, like Timestamp based, complex, custom, NonPartitioned Key gen, etc.

Insert Overwrite Table

Generate some new trips, overwrite the table logically at the Hudi metadata level. The Hudi cleaner will eventually clean up the previous table snapshot's file groups. This can be faster than deleting the older table and recreating in Overwrite mode.

// spark-shell
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
show(10, false)

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option("hoodie.datasource.write.operation","insert_overwrite_table").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
mode(Append).
save(basePath)

// Should have different keys now, from query before.
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
show(10, false)

Insert Overwrite

Generate some new trips, overwrite the all the partitions that are present in the input. This operation can be faster than upsert for batch ETL jobs, that are recomputing entire target partitions at once (as opposed to incrementally updating the target tables). This is because, we are able to bypass indexing, precombining and other repartitioning steps in the upsert write path completely.

// spark-shell
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
read.json(spark.sparkContext.parallelize(inserts, 2)).
filter("partitionpath = 'americas/united_states/san_francisco'")
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option("hoodie.datasource.write.operation","insert_overwrite").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
mode(Append).
save(basePath)

// Should have different keys now for San Francisco alone, from query before.
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)

Deletes

Hudi supports implementing two types of deletes on data stored in Hudi tables, by enabling the user to specify a different record payload implementation. For more info refer to Delete support in Hudi.

  • Soft Deletes : Retain the record key and just null out the values for all the other fields. This can be achieved by ensuring the appropriate fields are nullable in the table schema and simply upserting the table after setting these fields to null. Note that soft deletes are always persisted in storage and never removed, but all values are set to nulls. So for GDPR or other compliance reasons, users should consider doing hard deletes if record key and partition path contain PII.

For example:

// fetch two records for soft deletes
val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)

// prepare the soft deletes by ensuring the appropriate fields are nullified
val nullifyColumns = softDeleteDs.schema.fields.
map(field => (field.name, field.dataType.typeName)).
filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
&& !Array("ts", "uuid", "partitionpath").contains(pair._1)))

val softDeleteDf = nullifyColumns.
foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
(ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))

// simply upsert the table after setting these fields to null
softDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option("hoodie.datasource.write.operation", "upsert").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
mode(Append).
save(basePath)
  • Hard Deletes : A stronger form of deletion is to physically remove any trace of the record from the table. This can be achieved in 3 different ways.
  1. Using Datasource, set "hoodie.datasource.write.operation" to "delete". This will remove all the records in the DataSet being submitted.

Example, first read in a dataset:

val roViewDF = spark.
read.
format("org.apache.hudi").
load(basePath + "/*/*/*/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select count(*) from hudi_ro_table").show() // should return 10 (number of records inserted above)
val riderValue = spark.sql("select distinct rider from hudi_ro_table").show()
// copy the value displayed to be used in next step

Now write a query of which records you would like to delete:

val df = spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'")

Lastly, execute the deletion of these records:

val deletes = dataGen.generateDeletes(df.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option("hoodie.datasource.write.operation","delete").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", tableName).
mode(Append).
save(basePath);
  1. Using DataSource, set PAYLOAD_CLASS_OPT_KEY to "org.apache.hudi.EmptyHoodieRecordPayload". This will remove all the records in the DataSet being submitted.

This example will remove all the records from the table that exist in the DataSet deleteDF:

 deleteDF // dataframe containing just records to be deleted
.write().format("org.apache.hudi")
.option(...) // Add HUDI options like record-key, partition-path and others as needed for your setup
// specify record_key, partition_key, precombine_fieldkey & usual params
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
  1. Using DataSource or DeltaStreamer, add a column named _hoodie_is_deleted to DataSet. The value of this column must be set to true for all the records to be deleted and either false or left null for any records which are to be upserted.

Let's say the original schema is:

{
"type":"record",
"name":"example_tbl",
"fields":[{
"name": "uuid",
"type": "String"
}, {
"name": "ts",
"type": "string"
}, {
"name": "partitionPath",
"type": "string"
}, {
"name": "rank",
"type": "long"
}
]}

Make sure you add _hoodie_is_deleted column:

{
"type":"record",
"name":"example_tbl",
"fields":[{
"name": "uuid",
"type": "String"
}, {
"name": "ts",
"type": "string"
}, {
"name": "partitionPath",
"type": "string"
}, {
"name": "rank",
"type": "long"
}, {
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
}
]}

Then any record you want to delete you can mark _hoodie_is_deleted as true:

{"ts": 0.0, "uuid": "19tdb048-c93e-4532-adf9-f61ce6afe10", "rank": 1045, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : true}

Concurrency Control

Following is an example of how to use optimistic_concurrency_control via Spark DataSource API.

Read more in-depth details about concurrency control in the concurrency control concepts section.

inputDF.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option("hoodie.datasource.write.precombine.field", "ts")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
.option("hoodie.write.lock.zookeeper.url", "zookeeper")
.option("hoodie.write.lock.zookeeper.port", "2181")
.option("hoodie.write.lock.zookeeper.lock_key", "test_table")
.option("hoodie.write.lock.zookeeper.base_path", "/test")
.option("hoodie.datasource.write.recordkey.field", "uuid")
.option("hoodie.datasource.write.partitionpath.field", "partitionpath")
.option("hoodie.table.name", tableName)
.mode(Overwrite)
.save(basePath)

Java Client

We can use plain java to write to hudi tables. To use Java client we can refere here