配置
该页面介绍了几种配置写入或读取Hudi数据集的作业的方法。 简而言之,您可以在几个级别上控制行为。
- Spark数据源配置 : 这些配置控制Hudi Spark数据源,提供如下功能: 定义键和分区、选择写操作、指定如何合并记录或选择要读取的视图类型。
- WriteClient 配置 : 在内部,Hudi数据源使用基于RDD的
HoodieWriteClient
API 真正执行对存储的写入。 这些配置可对文件大小、压缩(compression)、并行度、压缩(compaction)、写入模式、清理等底层方面进行完全控制。 尽管Hudi提供了合理的默认设置,但在不同情形下,可能需要对这些配置进行调整以针对特定的工作负载进行优化。 - RecordPayload 配置 : 这是Hudi提供的最底层的定制。
RecordPayload定义了如何根据传入的新记录和存储的旧记录来产生新值以进行插入更新。
Hudi提供了诸如
OverwriteWithLatestAvroPayload
的默认实现,该实现仅使用最新或最后写入的记录来更新存储。 在数据源和WriteClient级别,都可以将其重写为扩展HoodieRecordPayload
类的自定义类。
Spark数据源配置
可以通过将以下选项传递到option(k,v)
方法中来配置使用数据源的Spark作业。
实际的数据源级别配置在下面列出。
写选项
另外,您可以使用options()
或option(k,v)
方法直接传递任何WriteClient级别的配置。
inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) // 任何Hudi客户端选项都可以传入
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);
用于通过write.format.option(...)
写入数据集的选项
TABLE_NAME_OPT_KEY
属性:hoodie.datasource.write.table.name
[必须]
是否为写操作进行插入更新、插入或批量插入。使用`bulkinsert`将新数据加载到表中,之后使用`upsert`或`insert`。 批量插入使用基于磁盘的写入路径来扩展以加载大量输入,而无需对其进行缓存。#### STORAGE_TYPE_OPT_KEY 属性:`hoodie.datasource.write.storage.type`, 默认值:`COPY_ON_WRITE`
此写入的基础数据的存储类型。两次写入之间不能改变。#### PRECOMBINE_FIELD_OPT_KEY 属性:`hoodie.datasource.write.precombine.field`, 默认值:`ts`
实际写入之前在preCombining中使用的字段。 当两个记录具有相同的键值时,我们将使用Object.compareTo(..)从precombine字段中选择一个值最大的记录。
PAYLOAD_CLASS_OPT_KEY
属性:hoodie.datasource.write.payload.class
, 默认值:org.apache.hudi.OverwriteWithLatestAvroPayload
记录键字段。用作`HoodieKey`中`recordKey`部分的值。 实际值将通过在字段值上调用.toString()来获得。可以使用点符号指定嵌套字段,例如:`a.b.c`
PARTITIONPATH_FIELD_OPT_KEY
属性:hoodie.datasource.write.partitionpath.field
, 默认值:partitionpath
HIVE_STYLE_PARTITIONING_OPT_KEY
属性:hoodie.datasource.write.hive_style_partitioning
, 默认值:false
WriteClient 配置
直接使用RDD级别api进行编程的Jobs可以构建一个HoodieWriteConfig
对象,并将其传递给HoodieWriteClient
构造函数。
HoodieWriteConfig可以使用以下构建器模式构建。
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.forTable(tableName)
.withSchema(schemaStr)
.withProps(props) // 从属性文件传递原始k、v对。
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withXXX(...).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withXXX(...).build())
...
.build();
以下各节介绍了写配置的不同方面,并解释了最重要的配置及其属性名称和默认值。
withPath(hoodie_base_path)
属性:hoodie.base.path
[必须]
withSchema(schema_str)
属性:hoodie.avro.schema
[必须]
forTable(table_name)
属性:hoodie.table.name
[必须]
withBulkInsertParallelism(bulk_insert_parallelism = 1500)
属性:hoodie.bulkinsert.shuffle.parallelism
withParallelism(insert_shuffle_parallelism = 1500, upsert_shuffle_parallelism = 1500)
属性:hoodie.insert.shuffle.parallelism
, hoodie.upsert.shuffle.parallelism
combineInput(on_insert = false, on_update=true)
属性:hoodie.combine.before.insert
, hoodie.combine.before.upsert
withWriteStatusStorageLevel(level = MEMORY_AND_DISK_SER)
属性:hoodie.write.status.storage.level
withAutoCommit(autoCommit = true)
属性:hoodie.auto.commit
withAssumeDatePartitioning(assumeDatePartitioning = false)
属性:hoodie.assume.date.partitioning
withConsistencyCheckEnabled(enabled = false)
属性:hoodie.consistency.check.enabled
索引配置
以下配置控制索引行为,该行为将传入记录标记为对较旧记录的插入或更新。
withIndexConfig (HoodieIndexConfig)
withIndexClass(indexClass = "x.y.z.UserDefinedIndex")
属性:hoodie.index.class
要使用的索引类型。默认为布隆过滤器。可能的选项是[BLOOM | HBASE | INMEMORY]。 布隆过滤器消除了对外部系统的依赖,并存储在Parquet数据文件的页脚中
bloomFilterNumEntries(numEntries = 60000)
属性:hoodie.index.bloom.num_entries
这是要存储在布隆过滤器中的条目数。 我们假设maxParquetFileSize为128MB,averageRecordSize为1024B,因此,一个文件中的记录总数约为130K。 默认值(60000)大约是此近似值的一半。[HUDI-56](https://issues.apache.org/jira/browse/HUDI-56) 描述了如何动态地对此进行计算。 警告:将此值设置得太低,将产生很多误报,并且索引查找将必须扫描比其所需的更多的文件;如果将其设置得非常高,将线性增加每个数据文件的大小(每50000个条目大约4KB)。
bloomFilterFPP(fpp = 0.000000001)
属性:hoodie.index.bloom.fpp
根据条目数允许的错误率。 这用于计算应为布隆过滤器分配多少位以及哈希函数的数量。通常将此值设置得很低(默认值:0.000000001),我们希望在磁盘空间上进行权衡以降低误报率
bloomIndexPruneByRanges(pruneRanges = true)
属性:hoodie.bloom.index.prune.by.ranges
为true时,从文件框定信息,可以加快索引查找的速度。 如果键具有单调递增的前缀,例如时间戳,则特别有用。
bloomIndexUseCaching(useCaching = true)
属性:hoodie.bloom.index.use.caching
为true时,将通过减少用于计算并行度或受影响分区的IO来缓存输入的RDD以加快索引查找
bloomIndexTreebasedFilter(useTreeFilter = true)
属性:hoodie.bloom.index.use.treebased.filter
为true时,启用基于间隔树的文件过滤优化。与暴力模式相比,此模式可根据键范围加快文件过滤速度
bloomIndexBucketizedChecking(bucketizedChecking = true)
属性:hoodie.bloom.index.bucketized.checking
为true时,启用了桶式布隆过滤。这减少了在基于排序的布隆索引查找中看到的偏差
bloomIndexKeysPerBucket(keysPerBucket = 10000000)
属性:hoodie.bloom.index.keys.per.bucket
此配置控制“存储桶”的大小,该大小可跟踪对单个文件进行的记录键检查的次数,并且是分配给执行布隆过滤器查找的每个分区的工作单位。 较高的值将分摊将布隆过滤器读取到内存的固定成本。
bloomIndexParallelism(0)
属性:hoodie.bloom.index.parallelism
这是索引查找的并行度,其中涉及Spark Shuffle。 默认情况下,这是根据输入的工作负载特征自动计算的
hbaseZkQuorum(zkString) [必须]
属性:hoodie.index.hbase.zkquorum
hbaseZkPort(port) [必须]
属性:hoodie.index.hbase.zkport
hbaseZkZnodeParent(zkZnodeParent) [必须]
属性:hoodie.index.hbase.zknode.path
hbaseTableName(tableName) [必须]
属性:hoodie.index.hbase.table
bloomIndexUpdatePartitionPath(updatePartitionPath = false)
属性:hoodie.bloom.index.update.partition.path
为true时,当对一个已有记录执行包含分区路径的更新操作时,将会导致把新记录插入到新分区,而把原有记录从旧分区里删除。为false时,只对旧分区的原有记录进行更新。
存储选项
控制有关调整parquet和日志文件大小的方面。
withStorageConfig (HoodieStorageConfig)
limitFileSize (size = 120MB)
属性:hoodie.parquet.max.file.size
parquetBlockSize(rowgroupsize = 120MB)
属性:hoodie.parquet.block.size
parquetPageSize(pagesize = 1MB)
属性:hoodie.parquet.page.size
parquetCompressionRatio(parquetCompressionRatio = 0.1)
属性:hoodie.parquet.compression.ratio
parquetCompressionCodec(parquetCompressionCodec = gzip)
属性:hoodie.parquet.compression.codec
logFileMaxSize(logFileSize = 1GB)
属性:hoodie.logfile.max.size
logFileDataBlockMaxSize(dataBlockSize = 256MB)
属性:hoodie.logfile.data.block.max.size
logFileToParquetCompressionRatio(logFileToParquetCompressionRatio = 0.35)
属性:hoodie.logfile.to.parquet.compression.ratio
Parquet文件的压缩编解码方式
压缩配置
压缩配置用于控制压缩(将日志文件合并到新的parquet基本文件中)、清理(回收较旧及未使用的文件组)。
withCompactionConfig (HoodieCompactionConfig)
withCleanerPolicy(policy = KEEP_LATEST_COMMITS)
属性:hoodie.cleaner.policy
retainCommits(no_of_commits_to_retain = 24)
属性:hoodie.cleaner.commits.retained
archiveCommitsWith(minCommits = 96, maxCommits = 128)
属性:hoodie.keep.min.commits
, hoodie.keep.max.commits
withCommitsArchivalBatchSize(batch = 10)
属性:hoodie.commits.archival.batch
compactionSmallFileSize(size = 0)
属性:hoodie.parquet.small.file.limit
insertSplitSize(size = 500000)
属性:hoodie.copyonwrite.insert.split.size
autoTuneInsertSplits(true)
属性:hoodie.copyonwrite.insert.auto.split
approxRecordSize(size = 1024)
属性:hoodie.copyonwrite.record.size.estimate
withInlineCompaction(inlineCompaction = false)
属性:hoodie.compact.inline
withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltaCommitsBeforeCompaction = 10)
属性:hoodie.compact.inline.max.delta.commits
withCompactionLazyBlockReadEnabled(true)
属性:hoodie.compaction.lazy.block.read
withCompactionReverseLogReadEnabled(false)
属性:hoodie.compaction.reverse.log.read
withCleanerParallelism(cleanerParallelism = 200)
属性:hoodie.cleaner.parallelism
withCompactionStrategy(compactionStrategy = org.apache.hudi.io.compact.strategy.LogFileSizeBasedCompactionStrategy)
属性:hoodie.compaction.strategy
withTargetIOPerCompactionInMB(targetIOPerCompactionInMB = 500000)
属性:hoodie.compaction.target.io
withTargetPartitionsPerDayBasedCompaction(targetPartitionsPerCompaction = 10)
属性:hoodie.compaction.daybased.target
withPayloadClass(payloadClassName = org.apache.hudi.common.model.HoodieAvroPayload)
属性:hoodie.compaction.payload.class
指标配置
配置Hudi指标报告。
withMetricsConfig (HoodieMetricsConfig)
GRAPHITE
on(metricsOn = false)
属性:hoodie.metrics.on
withReporterType(reporterType = GRAPHITE)
属性:hoodie.metrics.reporter.type
toGraphiteHost(host = localhost)
属性:hoodie.metrics.graphite.host
onGraphitePort(port = 4756)
属性:hoodie.metrics.graphite.port
usePrefix(prefix = "")
属性:hoodie.metrics.graphite.metric.prefix
JMX
on(metricsOn = false)
属性:hoodie.metrics.on
withReporterType(reporterType = JMX)
属性:hoodie.metrics.reporter.type
toJmxHost(host = localhost)
属性:hoodie.metrics.jmx.host
onJmxPort(port = 1000-5000)
属性:hoodie.metrics.graphite.port
DATADOG
on(metricsOn = false)
属性:hoodie.metrics.on
withReporterType(reporterType = DATADOG)
属性: hoodie.metrics.reporter.type
withDatadogReportPeriodSeconds(period = 30)
属性: hoodie.metrics.datadog.report.period.seconds
withDatadogApiSite(apiSite)
属性: hoodie.metrics.datadog.api.site
withDatadogApiKey(apiKey)
属性: hoodie.metrics.datadog.api.key
withDatadogApiKeySkipValidation(skip = false)
属性: hoodie.metrics.datadog.api.key.skip.validation
withDatadogApiKeySupplier(apiKeySupplier)
属性: hoodie.metrics.datadog.api.key.supplier
withDatadogApiTimeoutSeconds(timeout = 3)
属性: hoodie.metrics.datadog.metric.prefix
withDatadogPrefix(prefix)
属性: hoodie.metrics.datadog.metric.prefix
withDatadogHost(host)
属性: hoodie.metrics.datadog.metric.host
withDatadogTags(tags)
属性: hoodie.metrics.datadog.metric.tags
用户自定义发送器
on(metricsOn = false)
属性: hoodie.metrics.on
withReporterClass(className = "")
属性: hoodie.metrics.reporter.class
内存配置
控制由Hudi内部执行的压缩和合并的内存使用情况
withMemoryConfig (HoodieMemoryConfig)
withMaxMemoryFractionPerPartitionMerge(maxMemoryFractionPerPartitionMerge = 0.6)
属性:hoodie.memory.merge.fraction
withMaxMemorySizePerCompactionInBytes(maxMemorySizePerCompactionInBytes = 1GB)
属性:hoodie.memory.compaction.fraction
withWriteStatusFailureFraction(failureFraction = 0.1)
属性:hoodie.memory.writestatus.failure.fraction
写提交回调配置
控制写提交的回调。 如果用户启用了回调并且回调过程发生了错误,则会抛出异常。 当前支持HTTP, Kafka 两种回调方式。
withCallbackConfig (HoodieWriteCommitCallbackConfig)
writeCommitCallbackOn(callbackOn = false)
Property: hoodie.write.commit.callback.on
withCallbackClass(callbackClass)
Property: hoodie.write.commit.callback.class
HTTP CALLBACK
通过 HTTP 发送写提交回调信息. 这是默认的实现方式,用户不需要显式指定。
withCallbackHttpUrl(url)
Property: hoodie.write.commit.callback.http.url
withCallbackHttpTimeoutSeconds(timeoutSeconds = 3)
Property: hoodie.write.commit.callback.http.timeout.seconds
withCallbackHttpApiKey(apiKey)
Property: hoodie.write.commit.callback.http.api.key
KAFKA CALLBACK
使用Kafka发送写提交回调信息, 用户需要配置 hoodie.write.commit.callback.class
= org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback
CALLBACK_KAFKA_BOOTSTRAP_SERVERS
Property: hoodie.write.commit.callback.kafka.bootstrap.servers
CALLBACK_KAFKA_TOPIC
Property: hoodie.write.commit.callback.kafka.topic
CALLBACK_KAFKA_PARTITION
Property: hoodie.write.commit.callback.kafka.partition
CALLBACK_KAFKA_ACKS
Property: hoodie.write.commit.callback.kafka.acks
CALLBACK_KAFKA_RETRIES
Property: hoodie.write.commit.callback.kafka.retries