Skip to main content
Version: 0.12.0

Writing Data

In this section, we will cover ways to ingest new changes from external sources or even other Hudi tables. The two main tools available are the DeltaStreamer tool, as well as the Spark Hudi datasource.

Spark Datasource Writer

The hudi-spark module offers the DataSource API to write (and read) a Spark DataFrame into a Hudi table. There are a number of options available:

HoodieWriteConfig:

TABLE_NAME (Required)

DataSourceWriteOptions:

RECORDKEY_FIELD_OPT_KEY (Required): Primary key field(s). Record keys uniquely identify a record/row within each partition. If one wants to have a global uniqueness, there are two options. You could either make the dataset non-partitioned, or, you can leverage Global indexes to ensure record keys are unique irrespective of the partition path. Record keys can either be a single column or refer to multiple columns. KEYGENERATOR_CLASS_OPT_KEY property should be set accordingly based on whether it is a simple or complex key. For eg: "col1" for simple field, "col1,col2,col3,etc" for complex field. Nested fields can be specified using the dot notation eg: a.b.c.
Default value: "uuid"

PARTITIONPATH_FIELD_OPT_KEY (Required): Columns to be used for partitioning the table. To prevent partitioning, provide empty string as value eg: "". Specify partitioning/no partitioning using KEYGENERATOR_CLASS_OPT_KEY. If partition path needs to be url encoded, you can set URL_ENCODE_PARTITIONING_OPT_KEY. If synchronizing to hive, also specify using HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.
Default value: "partitionpath"

PRECOMBINE_FIELD_OPT_KEY (Required): When two records within the same batch have the same key value, the record with the largest value from the field specified will be choosen. If you are using default payload of OverwriteWithLatestAvroPayload for HoodieRecordPayload (WRITE_PAYLOAD_CLASS), an incoming record will always takes precendence compared to the one in storage ignoring this PRECOMBINE_FIELD_OPT_KEY.
Default value: "ts"

OPERATION_OPT_KEY: The write operations to use.
Available values:
UPSERT_OPERATION_OPT_VAL (default), BULK_INSERT_OPERATION_OPT_VAL, INSERT_OPERATION_OPT_VAL, DELETE_OPERATION_OPT_VAL

TABLE_TYPE_OPT_KEY: The type of table to write to. Note: After the initial creation of a table, this value must stay consistent when writing to (updating) the table using the Spark SaveMode.Append mode.
Available values:
COW_TABLE_TYPE_OPT_VAL (default), MOR_TABLE_TYPE_OPT_VAL

KEYGENERATOR_CLASS_OPT_KEY: Refer to Key Generation section below.

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY: If using hive, specify if the table should or should not be partitioned.
Available values:
classOf[MultiPartKeysValueExtractor].getCanonicalName (default), classOf[SlashEncodedDayPartitionValueExtractor].getCanonicalName, classOf[TimestampBasedKeyGenerator].getCanonicalName, classOf[NonPartitionedExtractor].getCanonicalName, classOf[GlobalDeleteKeyGenerator].getCanonicalName (to be used when OPERATION_OPT_KEY is set to DELETE_OPERATION_OPT_VAL)

Example: Upsert a DataFrame, specifying the necessary field names for recordKey => _row_key, partitionPath => partition, and precombineKey => timestamp

inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) //Where clientOpts is of type Map[String, String]. clientOpts can include any other options necessary.
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);

Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.

// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
info

mode(Overwrite) overwrites and recreates the table if it already exists. You can check the data generated under /tmp/hudi_trips_cow/<region>/<country>/<city>/. We provided a record key (uuid in schema), partition field (region/country/city) and combine logic (ts in schema) to ensure trip records are unique within each partition. For more info, refer to Modeling data stored in Hudi and for info on ways to ingest data into Hudi, refer to Writing Hudi Tables. Here we are using the default write operation : upsert. If you have a workload without updates, you can also issue insert or bulk_insert operations which could be faster. To know more, refer to Write operations

Checkout https://hudi.apache.org/blog/2021/02/13/hudi-key-generators for various key generator options, like Timestamp based, complex, custom, NonPartitioned Key gen, etc.

Insert Overwrite Table

Generate some new trips, overwrite the table logically at the Hudi metadata level. The Hudi cleaner will eventually clean up the previous table snapshot's file groups. This can be faster than deleting the older table and recreating in Overwrite mode.

// spark-shell
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
show(10, false)

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"insert_overwrite_table").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

// Should have different keys now, from query before.
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
show(10, false)

Insert Overwrite

Generate some new trips, overwrite the all the partitions that are present in the input. This operation can be faster than upsert for batch ETL jobs, that are recomputing entire target partitions at once (as opposed to incrementally updating the target tables). This is because, we are able to bypass indexing, precombining and other repartitioning steps in the upsert write path completely.

// spark-shell
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
read.json(spark.sparkContext.parallelize(inserts, 2)).
filter("partitionpath = 'americas/united_states/san_francisco'")
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"insert_overwrite").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

// Should have different keys now for San Francisco alone, from query before.
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)

Deletes

Hudi supports implementing two types of deletes on data stored in Hudi tables, by enabling the user to specify a different record payload implementation. For more info refer to Delete support in Hudi.

  • Soft Deletes : Retain the record key and just null out the values for all the other fields. This can be achieved by ensuring the appropriate fields are nullable in the table schema and simply upserting the table after setting these fields to null. Note that soft deletes are always persisted in storage and never removed, but all values are set to nulls. So for GDPR or other compliance reasons, users should consider doing hard deletes if record key and partition path contain PII.

For example:

// fetch two records for soft deletes
val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)

// prepare the soft deletes by ensuring the appropriate fields are nullified
val nullifyColumns = softDeleteDs.schema.fields.
map(field => (field.name, field.dataType.typeName)).
filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
&& !Array("ts", "uuid", "partitionpath").contains(pair._1)))

val softDeleteDf = nullifyColumns.
foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
(ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))

// simply upsert the table after setting these fields to null
softDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY, "upsert").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
  • Hard Deletes : A stronger form of deletion is to physically remove any trace of the record from the table. This can be achieved in 3 different ways.
  1. Using Datasource, set OPERATION_OPT_KEY to DELETE_OPERATION_OPT_VAL. This will remove all the records in the DataSet being submitted.

Example, first read in a dataset:

val roViewDF = spark.
read.
format("org.apache.hudi").
load(basePath + "/*/*/*/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select count(*) from hudi_ro_table").show() // should return 10 (number of records inserted above)
val riderValue = spark.sql("select distinct rider from hudi_ro_table").show()
// copy the value displayed to be used in next step

Now write a query of which records you would like to delete:

val df = spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'")

Lastly, execute the deletion of these records:

val deletes = dataGen.generateDeletes(df.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath);
  1. Using DataSource, set PAYLOAD_CLASS_OPT_KEY to "org.apache.hudi.common.model.EmptyHoodieRecordPayload". This will remove all the records in the DataSet being submitted.

This example will remove all the records from the table that exist in the DataSet deleteDF:

 deleteDF // dataframe containing just records to be deleted
.write().format("org.apache.hudi")
.option(...) // Add HUDI options like record-key, partition-path and others as needed for your setup
// specify record_key, partition_key, precombine_fieldkey & usual params
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
  1. Using DataSource or DeltaStreamer, add a column named _hoodie_is_deleted to DataSet. The value of this column must be set to true for all the records to be deleted and either false or left null for any records which are to be upserted.

Let's say the original schema is:

{
"type":"record",
"name":"example_tbl",
"fields":[{
"name": "uuid",
"type": "String"
}, {
"name": "ts",
"type": "string"
}, {
"name": "partitionPath",
"type": "string"
}, {
"name": "rank",
"type": "long"
}
]}

Make sure you add _hoodie_is_deleted column:

{
"type":"record",
"name":"example_tbl",
"fields":[{
"name": "uuid",
"type": "String"
}, {
"name": "ts",
"type": "string"
}, {
"name": "partitionPath",
"type": "string"
}, {
"name": "rank",
"type": "long"
}, {
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
}
]}

Then any record you want to delete you can mark _hoodie_is_deleted as true:

{"ts": 0.0, "uuid": "19tdb048-c93e-4532-adf9-f61ce6afe10", "rank": 1045, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : true}

Concurrency Control

The hudi-spark module offers the DataSource API to write (and read) a Spark DataFrame into a Hudi table.

Following is an example of how to use optimistic_concurrency_control via spark datasource. Read more in depth about concurrency control in the concurrency control concepts section

inputDF.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
.option("hoodie.write.lock.zookeeper.url", "zookeeper")
.option("hoodie.write.lock.zookeeper.port", "2181")
.option("hoodie.write.lock.zookeeper.lock_key", "test_table")
.option("hoodie.write.lock.zookeeper.base_path", "/test")
.option(RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
.option(TABLE_NAME, tableName)
.mode(Overwrite)
.save(basePath)

Commit Notifications

Apache Hudi provides the ability to post a callback notification about a write commit. This may be valuable if you need an event notification stream to take actions with other services after a Hudi write commit. You can push a write commit callback notification into HTTP endpoints or to a Kafka server.

HTTP Endpoints

You can push a commit notification to an HTTP URL and can specify custom values by extending a callback class defined below.

ConfigDescriptionRequiredDefault
TURN_CALLBACK_ONTurn commit callback on/offoptionalfalse (callbacks off)
CALLBACK_HTTP_URLCallback host to be sent along with callback messagesrequiredN/A
CALLBACK_HTTP_TIMEOUT_IN_SECONDSCallback timeout in secondsoptional3
CALLBACK_CLASS_NAMEFull path of callback class and must be a subclass of HoodieWriteCommitCallback class, org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback by defaultoptionalorg.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback
CALLBACK_HTTP_API_KEY_VALUEHttp callback API keyoptionalhudi_write_commit_http_callback

Kafka Endpoints

You can push a commit notification to a Kafka topic so it can be used by other real time systems.

ConfigDescriptionRequiredDefault
TOPICKafka topic name to publish timeline activity into.requiredN/A
PARTITIONIt may be desirable to serialize all changes into a single Kafka partition for providing strict ordering. By default, Kafka messages are keyed by table name, which guarantees ordering at the table level, but not globally (or when new partitions are added)requiredN/A
RETRIESTimes to retry the produceoptional3
ACKSkafka acks level, all by default to ensure strong durabilityoptionalall
BOOTSTRAP_SERVERSBootstrap servers of kafka cluster, to be used for publishing commit metadatarequiredN/A

Pulsar Endpoints

You can push a commit notification to a Pulsar topic so it can be used by other real time systems.

ConfigDescriptionRequiredDefault
hoodie.write.commit.callback.pulsar.broker.service.urlServer's Url of pulsar cluster to use to publish commit metadata.requiredN/A
hoodie.write.commit.callback.pulsar.topicPulsar topic name to publish timeline activity intorequiredN/A
hoodie.write.commit.callback.pulsar.producer.route-modeMessage routing logic for producers on partitioned topics.optionalRoundRobinPartition
hoodie.write.commit.callback.pulsar.producer.pending-queue-sizeThe maximum size of a queue holding pending messages.optional1000
hoodie.write.commit.callback.pulsar.producer.pending-total-sizeThe maximum number of pending messages across partitions.required50000
hoodie.write.commit.callback.pulsar.producer.block-if-queue-fullWhen the queue is full, the method is blocked instead of an exception is thrown.optionaltrue
hoodie.write.commit.callback.pulsar.producer.send-timeoutThe timeout in each sending to pulsar.optional30s
hoodie.write.commit.callback.pulsar.operation-timeoutDuration of waiting for completing an operation.optional30s
hoodie.write.commit.callback.pulsar.connection-timeoutDuration of waiting for a connection to a broker to be established.optional10s
hoodie.write.commit.callback.pulsar.request-timeoutDuration of waiting for completing a request.optional60s
hoodie.write.commit.callback.pulsar.keepalive-intervalDuration of keeping alive interval for each client broker connection.optional30s

Bring your own implementation

You can extend the HoodieWriteCommitCallback class to implement your own way to asynchronously handle the callback of a successful write. Use this public API:

https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java

The hudi-flink module defines the Flink SQL connector for both hudi source and sink. There are a number of options available for the sink table:

Option NameRequiredDefaultRemarks
pathYN/ABase path for the target hoodie table. The path would be created if it does not exist, otherwise a hudi table expects to be initialized successfully
table.typeNCOPY_ON_WRITEType of table to write. COPY_ON_WRITE (or) MERGE_ON_READ
write.operationNupsertThe write operation, that this write should do (insert or upsert is supported)
write.precombine.fieldNtsField used in preCombining before actual write. When two records have the same key value, we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)
write.payload.classNOverwriteWithLatestAvroPayload.classPayload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. This will render any value set for the option in-effective
write.insert.drop.duplicatesNfalseFlag to indicate whether to drop duplicates upon insert. By default insert will accept duplicates, to gain extra performance
write.ignore.failedNtrueFlag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. By default true (in favor of streaming progressing over data integrity)
hoodie.datasource.write.recordkey.fieldNuuidRecord key field. Value to be used as the recordKey component of HoodieKey. Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using the dot notation eg: a.b.c
hoodie.datasource.write.keygenerator.classNSimpleAvroKeyGenerator.classKey generator class, that implements will extract the key out of incoming record
write.tasksN4Parallelism of tasks that do actual write, default is 4
write.batch.size.MBN128Batch buffer size in MB to flush data into the underneath filesystem

If the table type is MERGE_ON_READ, you can also specify the asynchronous compaction strategy through options:

Option NameRequiredDefaultRemarks
compaction.async.enabledNtrueAsync Compaction, enabled by default for MOR
compaction.trigger.strategyNnum_commitsStrategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits; 'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction; 'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; 'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is 'num_commits'
compaction.delta_commitsN5Max delta commits needed to trigger compaction, default 5 commits
compaction.delta_secondsN3600Max delta seconds time needed to trigger compaction, default 1 hour

You can write the data using the SQL INSERT INTO statements:

INSERT INTO hudi_table select ... from ...; 

Note: INSERT OVERWRITE is not supported yet but already on the roadmap.