Configurations
This page covers the different ways of configuring your job to write/read Hudi tables. At a high level, you can control behaviour at few levels.
- Spark Datasource Configs: These configs control the Hudi Spark Datasource, providing ability to define keys/partitioning, pick out the write operation, specify how to merge records or choosing query type to read.
- Flink Sql Configs: These configs control the Hudi Flink SQL source/sink connectors, providing ability to define record keys, pick out the write operation, specify how to merge records, enable/disable asynchronous compaction or choosing query type to read.
- Write Client Configs: Internally, the Hudi datasource uses a RDD based HoodieWriteClient API to actually perform writes to storage. These configs provide deep control over lower level aspects like file sizing, compression, parallelism, compaction, write schema, cleaning etc. Although Hudi provides sane defaults, from time-time these configs may need to be tweaked to optimize for specific workloads.
- Metrics Configs: These set of configs are used to enable monitoring and reporting of keyHudi stats and metrics.
- Record Payload Config: This is the lowest level of customization offered by Hudi. Record payloads define how to produce new values to upsert based on incoming new record and stored old record. Hudi provides default implementations such as OverwriteWithLatestAvroPayload which simply update table with the latest/last-written record. This can be overridden to a custom class extending HoodieRecordPayload class, on both datasource and WriteClient levels.
- Kafka Connect Configs: These set of configs are used for Kafka Connect Sink Connector for writing Hudi Tables
- Amazon Web Services Configs: Please fill in the description for Config Group Name: Amazon Web Services Configs
Externalized Config File
Instead of directly passing configuration settings to every Hudi job, you can also centrally set them in a configuration
file hudi-defaults.conf
. By default, Hudi would load the configuration file under /etc/hudi/conf
directory. You can
specify a different configuration directory location by setting the HUDI_CONF_DIR
environment variable. This can be
useful for uniformly enforcing repeated configs (like Hive sync or write/index tuning), across your entire data lake.
Spark Datasource Configs
These configs control the Hudi Spark Datasource, providing ability to define keys/partitioning, pick out the write operation, specify how to merge records or choosing query type to read.
Read Options
Options useful for reading tables via read.format.option(...)
Config Class
: org.apache.hudi.DataSourceOptions.scala
hoodie.datasource.read.incr.path.glob
For the use-cases like users only want to incremental pull from certain partitions instead of the full table. This option allows using glob pattern to directly filter on path.
Default Value: (Optional)
Config Param: INCR_PATH_GLOB
hoodie.file.index.enable
Enables use of the spark file index implementation for Hudi, that speeds up listing of large tables.
Default Value: true (Optional)
Config Param: ENABLE_HOODIE_FILE_INDEX
hoodie.datasource.read.paths
Comma separated list of file paths to read within a Hudi table.
Default Value: N/A (Required)
Config Param: READ_PATHS
hoodie.datasource.read.end.instanttime
Instant time to limit incrementally fetched data to. New data written with an instant_time <= END_INSTANTTIME are fetched out.
Default Value: N/A (Required)
Config Param: END_INSTANTTIME
hoodie.datasource.write.precombine.field
Field used in preCombining before actual write. When two records have the same key value, we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)
Default Value: ts (Optional)
Config Param: READ_PRE_COMBINE_FIELD
hoodie.datasource.read.incr.filters
For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies opaque map functions, filters appearing late in the sequence of transformations cannot be automatically pushed down. This option allows setting filters directly on Hoodie Source.
Default Value: (Optional)
Config Param: PUSH_DOWN_INCR_FILTERS
hoodie.datasource.merge.type
For Snapshot query on merge on read table, control whether we invoke the record payload implementation to merge (payload_combine) or skip merging altogetherskip_merge
Default Value: payload_combine (Optional)
Config Param: REALTIME_MERGE
hoodie.datasource.read.begin.instanttime
Instant time to start incrementally pulling data from. The instanttime here need not necessarily correspond to an instant on the timeline. New data written with an instant_time > BEGIN_INSTANTTIME are fetched out. For e.g: ‘20170901080000’ will get all new data written after Sep 1, 2017 08:00AM.
Default Value: N/A (Required)
Config Param: BEGIN_INSTANTTIME
hoodie.enable.data.skipping
Enables data-skipping allowing queries to leverage indexes to reduce the search space by skipping over files
Default Value: true (Optional)
Config Param: ENABLE_DATA_SKIPPING
Since Version: 0.10.0
as.of.instant
The query instant for time travel. Without specified this option, we query the latest snapshot.
Default Value: N/A (Required)
Config Param: TIME_TRAVEL_AS_OF_INSTANT
hoodie.datasource.query.type
Whether data needs to be read, in incremental mode (new data since an instantTime) (or) Read Optimized mode (obtain latest view, based on base files) (or) Snapshot mode (obtain latest view, by merging base and (if any) log files)
Default Value: snapshot (Optional)
Config Param: QUERY_TYPE
hoodie.datasource.read.schema.use.end.instanttime
Uses end instant schema when incrementally fetched data to. Default: users latest instant schema.
Default Value: false (Optional)
Config Param: INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME
Write Options
You can pass down any of the WriteClient level configs directly using options()
or option(k,v)
methods.
inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) // any of the Hudi client opts can be passed in as well
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);
Options useful for writing tables via write.format.option(...)
Config Class
: org.apache.hudi.DataSourceOptions.scala
hoodie.datasource.clustering.async.enable
Enable asynchronous clustering. Disabled by default.
Default Value: false (Optional)
Config Param: ASYNC_CLUSTERING_ENABLE
Since Version: 0.9.0
hoodie.datasource.write.operation
Whether to do upsert, insert or bulkinsert for the write operation. Use bulkinsert to load new data into a table, and there on use upsert/insert. bulk insert uses a disk based write path to scale to load large inputs without need to cache it.
Default Value: upsert (Optional)
Config Param: OPERATION
hoodie.datasource.write.reconcile.schema
When a new batch of write has records with old schema, but latest table schema got evolved, this config will upgrade the records to leverage latest table schema(default values will be injected to missing fields). If not, the write batch would fail.
Default Value: false (Optional)
Config Param: RECONCILE_SCHEMA
hoodie.datasource.write.recordkey.field
Record key field. Value to be used as the
recordKey
component ofHoodieKey
. Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using the dot notation eg:a.b.c
Default Value: uuid (Optional)
Config Param: RECORDKEY_FIELD
hoodie.datasource.hive_sync.skip_ro_suffix
Skip the _ro suffix for Read optimized table, when registering
Default Value: false (Optional)
Config Param: HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE
hoodie.datasource.write.partitionpath.urlencode
Should we url encode the partition path value, before creating the folder structure.
Default Value: false (Optional)
Config Param: URL_ENCODE_PARTITIONING
hoodie.datasource.hive_sync.partition_extractor_class
Class which implements PartitionValueExtractor to extract the partition values, default 'SlashEncodedDayPartitionValueExtractor'.
Default Value: org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor (Optional)
Config Param: HIVE_PARTITION_EXTRACTOR_CLASS
hoodie.datasource.hive_sync.serde_properties
Serde properties to hive table.
Default Value: N/A (Required)
Config Param: HIVE_TABLE_SERDE_PROPERTIES
hoodie.datasource.hive_sync.password
hive password to use
Default Value: hive (Optional)
Config Param: HIVE_PASS
hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled
When set to true, consistent value will be generated for a logical timestamp type column, like timestamp-millis and timestamp-micros, irrespective of whether row-writer is enabled. Disabled by default so as not to break the pipeline that deploy either fully row-writer path or non row-writer path. For example, if it is kept disabled then record key of timestamp type with value
2016-12-29 09:54:00
will be written as timestamp2016-12-29 09:54:00.0
in row-writer path, while it will be written as long value1483023240000000
in non row-writer path. If enabled, then the timestamp value will be written in both the cases.
Default Value: false (Optional)
Config Param: KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED
hoodie.datasource.hive_sync.support_timestamp
‘INT64’ with original type TIMESTAMP_MICROS is converted to hive ‘timestamp’ type. Disabled by default for backward compatibility.
Default Value: false (Optional)
Config Param: HIVE_SUPPORT_TIMESTAMP_TYPE
hoodie.datasource.hive_sync.create_managed_table
Whether to sync the table as managed table.
Default Value: false (Optional)
Config Param: HIVE_CREATE_MANAGED_TABLE
hoodie.datasource.clustering.inline.enable
Enable inline clustering. Disabled by default.
Default Value: false (Optional)
Config Param: INLINE_CLUSTERING_ENABLE
Since Version: 0.9.0
hoodie.datasource.compaction.async.enable
Controls whether async compaction should be turned on for MOR table writing.
Default Value: true (Optional)
Config Param: ASYNC_COMPACT_ENABLE
hoodie.datasource.meta.sync.enable
Default Value: false (Optional)
Config Param: META_SYNC_ENABLED
hoodie.datasource.write.streaming.ignore.failed.batch
Config to indicate whether to ignore any non exception error (e.g. writestatus error) within a streaming microbatch
Default Value: true (Optional)
Config Param: STREAMING_IGNORE_FAILED_BATCH
hoodie.datasource.write.precombine.field
Field used in preCombining before actual write. When two records have the same key value, we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)
Default Value: ts (Optional)
Config Param: PRECOMBINE_FIELD
hoodie.datasource.hive_sync.username
hive user name to use
Default Value: hive (Optional)
Config Param: HIVE_USER
hoodie.datasource.write.partitionpath.field
Partition path field. Value to be used at the partitionPath component of HoodieKey. Actual value ontained by invoking .toString()
Default Value: N/A (Required)
Config Param: PARTITIONPATH_FIELD
hoodie.datasource.write.streaming.retry.count
Config to indicate how many times streaming job should retry for a failed micro batch.
Default Value: 3 (Optional)
Config Param: STREAMING_RETRY_CNT
hoodie.datasource.hive_sync.partition_fields
Field in the table to use for determining hive partition columns.
Default Value: (Optional)
Config Param: HIVE_PARTITION_FIELDS
hoodie.datasource.hive_sync.sync_as_datasource
Default Value: true (Optional)
Config Param: HIVE_SYNC_AS_DATA_SOURCE_TABLE
hoodie.sql.insert.mode
Insert mode when insert data to pk-table. The optional modes are: upsert, strict and non-strict.For upsert mode, insert statement do the upsert operation for the pk-table which will update the duplicate record.For strict mode, insert statement will keep the primary key uniqueness constraint which do not allow duplicate record.While for non-strict mode, hudi just do the insert operation for the pk-table.
Default Value: upsert (Optional)
Config Param: SQL_INSERT_MODE
hoodie.datasource.hive_sync.use_jdbc
Use JDBC when hive synchronization is enabled
Default Value: true (Optional)
Config Param: HIVE_USE_JDBC
Deprecated Version: 0.9.0
hoodie.meta.sync.client.tool.class
Sync tool class name used to sync to metastore. Defaults to Hive.
Default Value: org.apache.hudi.hive.HiveSyncTool (Optional)
Config Param: META_SYNC_CLIENT_TOOL_CLASS_NAME
hoodie.datasource.write.keygenerator.class
Key generator class, that implements
org.apache.hudi.keygen.KeyGenerator
Default Value: org.apache.hudi.keygen.SimpleKeyGenerator (Optional)
Config Param: KEYGENERATOR_CLASS_NAME
hoodie.datasource.write.payload.class
Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective
Default Value: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload (Optional)
Config Param: PAYLOAD_CLASS_NAME
hoodie.datasource.hive_sync.table_properties
Additional properties to store with table.
Default Value: N/A (Required)
Config Param: HIVE_TABLE_PROPERTIES
hoodie.datasource.hive_sync.jdbcurl
Hive metastore url
Default Value: jdbc:hive2://localhost:10000 (Optional)
Config Param: HIVE_URL
hoodie.datasource.hive_sync.batch_num
The number of partitions one batch when synchronous partitions to hive.
Default Value: 1000 (Optional)
Config Param: HIVE_BATCH_SYNC_PARTITION_NUM
hoodie.datasource.hive_sync.assume_date_partitioning
Assume partitioning is yyyy/mm/dd
Default Value: false (Optional)
Config Param: HIVE_ASSUME_DATE_PARTITION
hoodie.datasource.hive_sync.bucket_sync
Whether sync hive metastore bucket specification when using bucket index.The specification is 'CLUSTERED BY (trace_id) SORTED BY (trace_id ASC) INTO 65536 BUCKETS'
Default Value: false (Optional)
Config Param: HIVE_SYNC_BUCKET_SYNC
hoodie.datasource.hive_sync.auto_create_database
Auto create hive database if does not exists
Default Value: true (Optional)
Config Param: HIVE_AUTO_CREATE_DATABASE
hoodie.datasource.hive_sync.database
database to sync to
Default Value: default (Optional)
Config Param: HIVE_DATABASE
hoodie.datasource.write.streaming.retry.interval.ms
Config to indicate how long (by millisecond) before a retry should issued for failed microbatch
Default Value: 2000 (Optional)
Config Param: STREAMING_RETRY_INTERVAL_MS
hoodie.sql.bulk.insert.enable
When set to true, the sql insert statement will use bulk insert.
Default Value: false (Optional)
Config Param: SQL_ENABLE_BULK_INSERT
hoodie.datasource.write.commitmeta.key.prefix
Option keys beginning with this prefix, are automatically added to the commit/deltacommit metadata. This is useful to store checkpointing information, in a consistent way with the hudi timeline
Default Value: _ (Optional)
Config Param: COMMIT_METADATA_KEYPREFIX
hoodie.datasource.write.drop.partition.columns
When set to true, will not write the partition columns into hudi. By default, false.
Default Value: false (Optional)
Config Param: DROP_PARTITION_COLUMNS
hoodie.datasource.hive_sync.enable
When set to true, register/sync the table to Apache Hive metastore
Default Value: false (Optional)
Config Param: HIVE_SYNC_ENABLED
hoodie.datasource.hive_sync.table
table to sync to
Default Value: unknown (Optional)
Config Param: HIVE_TABLE
hoodie.datasource.hive_sync.ignore_exceptions
Default Value: false (Optional)
Config Param: HIVE_IGNORE_EXCEPTIONS
hoodie.datasource.hive_sync.use_pre_apache_input_format
Flag to choose InputFormat under com.uber.hoodie package instead of org.apache.hudi package. Use this when you are in the process of migrating from com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to org.apache.hudi input format
Default Value: false (Optional)
Config Param: HIVE_USE_PRE_APACHE_INPUT_FORMAT
hoodie.datasource.write.table.type
The table type for the underlying data, for this write. This can’t change between writes.
Default Value: COPY_ON_WRITE (Optional)
Config Param: TABLE_TYPE
hoodie.datasource.write.row.writer.enable
When set to true, will perform write operations directly using the spark native
Row
representation, avoiding any additional conversion costs.
Default Value: true (Optional)
Config Param: ENABLE_ROW_WRITER
hoodie.datasource.write.hive_style_partitioning
Flag to indicate whether to use Hive style partitioning. If set true, the names of partition folders follow <partition_column_name>=<partition_value> format. By default false (the names of partition folders are only partition values)
Default Value: false (Optional)
Config Param: HIVE_STYLE_PARTITIONING
hoodie.datasource.hive_sync.conditional_sync
Enables conditional hive sync, where partition or schema change must exist to perform sync to hive.
Default Value: false (Optional)
Config Param: HIVE_CONDITIONAL_SYNC
hoodie.datasource.hive_sync.mode
Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.
Default Value: N/A (Required)
Config Param: HIVE_SYNC_MODE
hoodie.datasource.write.table.name
Table name for the datasource write. Also used to register the table into meta stores.
Default Value: N/A (Required)
Config Param: TABLE_NAME
hoodie.datasource.hive_sync.base_file_format
Base file format for the sync.
Default Value: PARQUET (Optional)
Config Param: HIVE_BASE_FILE_FORMAT
hoodie.deltastreamer.source.kafka.value.deserializer.class
This class is used by kafka client to deserialize the records
Default Value: io.confluent.kafka.serializers.KafkaAvroDeserializer (Optional)
Config Param: KAFKA_AVRO_VALUE_DESERIALIZER_CLASS
Since Version: 0.9.0
hoodie.datasource.write.insert.drop.duplicates
If set to true, filters out all duplicate records from incoming dataframe, during insert operations.
Default Value: false (Optional)
Config Param: INSERT_DROP_DUPS
hoodie.datasource.write.partitions.to.delete
Comma separated list of partitions to delete
Default Value: N/A (Required)
Config Param: PARTITIONS_TO_DELETE
PreCommit Validator Configurations
The following set of configurations help validate new data before commits.
Config Class
: org.apache.hudi.config.HoodiePreCommitValidatorConfig
hoodie.precommit.validators.single.value.sql.queries
Spark SQL queries to run on table before committing new data to validate state after commit.Multiple queries separated by ';' delimiter are supported.Expected result is included as part of query separated by '#'. Example query: 'query1#result1:query2#result2'Note <TABLE_NAME> variable is expected to be present in query.
Default Value: (Optional)
Config Param: SINGLE_VALUE_SQL_QUERIES
hoodie.precommit.validators.equality.sql.queries
Spark SQL queries to run on table before committing new data to validate state before and after commit. Multiple queries separated by ';' delimiter are supported. Example: "select count(*) from <TABLE_NAME> Note <TABLE_NAME> is replaced by table state before and after commit.
Default Value: (Optional)
Config Param: EQUALITY_SQL_QUERIES
hoodie.precommit.validators
Comma separated list of class names that can be invoked to validate commit
Default Value: (Optional)
Config Param: VALIDATOR_CLASS_NAMES
hoodie.precommit.validators.inequality.sql.queries
Spark SQL queries to run on table before committing new data to validate state before and after commit.Multiple queries separated by ';' delimiter are supported.Example query: 'select count(*) from <TABLE_NAME> where col=null'Note <TABLE_NAME> variable is expected to be present in query.
Default Value: (Optional)
Config Param: INEQUALITY_SQL_QUERIES
Flink Sql Configs
These configs control the Hudi Flink SQL source/sink connectors, providing ability to define record keys, pick out the write operation, specify how to merge records, enable/disable asynchronous compaction or choosing query type to read.
Flink Options
Flink jobs using the SQL can be configured through the options in WITH clause. The actual datasource level configs are listed below.
Config Class
: org.apache.hudi.configuration.FlinkOptions
write.bulk_insert.sort_by_partition
Whether to sort the inputs by partition path for bulk insert tasks, default true
Default Value: true (Optional)
Config Param: WRITE_BULK_INSERT_SORT_BY_PARTITION
read.streaming.enabled
Whether to read as streaming source, default false
Default Value: false (Optional)
Config Param: READ_AS_STREAMING
hoodie.datasource.write.keygenerator.type
Key generator type, that implements will extract the key out of incoming record. Note This is being actively worked on. Please use
hoodie.datasource.write.keygenerator.class
instead.
Default Value: SIMPLE (Optional)
Config Param: KEYGEN_TYPE
compaction.trigger.strategy
Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits; 'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction; 'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; 'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is 'num_commits'
Default Value: num_commits (Optional)
Config Param: COMPACTION_TRIGGER_STRATEGY
index.state.ttl
Index state ttl in days, default stores the index permanently
Default Value: 0.0 (Optional)
Config Param: INDEX_STATE_TTL
compaction.max_memory
Max memory in MB for compaction spillable map, default 100MB
Default Value: 100 (Optional)
Config Param: COMPACTION_MAX_MEMORY
hive_sync.support_timestamp
INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type. Disabled by default for backward compatibility.
Default Value: false (Optional)
Config Param: HIVE_SYNC_SUPPORT_TIMESTAMP
hive_sync.skip_ro_suffix
Skip the _ro suffix for Read optimized table when registering, default false
Default Value: false (Optional)
Config Param: HIVE_SYNC_SKIP_RO_SUFFIX
metadata.compaction.delta_commits
Max delta commits for metadata table to trigger compaction, default 24
Default Value: 10 (Optional)
Config Param: METADATA_COMPACTION_DELTA_COMMITS
hive_sync.assume_date_partitioning
Assume partitioning is yyyy/mm/dd, default false
Default Value: false (Optional)
Config Param: HIVE_SYNC_ASSUME_DATE_PARTITION
write.bulk_insert.shuffle_by_partition