配置
该页面介绍了几种配置写入或读取Hudi数据集的作业的方法。 简而言之,您可以在几个级别上控制行为。
- Spark数据源配置 : 这些配置控制Hudi Spark数据源,提供如下功能: 定义键和分区、选择写操作、指定如何合并记录或选择要读取的视图类型。
- WriteClient 配置 : 在内部,Hudi数据源使用基于RDD的
HoodieWriteClient
API 真正执行对存储的写入。 这些配置可对文件大小、压缩(compression)、并行度、压缩(compaction)、写入模式、清理等底层方面进行完全控制。 尽管Hudi提供了合理的默认设置,但在不同情形下,可能需要对这些配置进行调整以针对特定的工作负载进行优化。 - RecordPayload 配置 : 这是Hudi提供的最底层的定制。
RecordPayload定义了如何根据传入的新记录和存储的旧记录来产生新值以进行插入更新。
Hudi提供了诸如
OverwriteWithLatestAvroPayload
的默认实现,该实现仅使用最新或最后写入的记录来更新存储。 在数据源和WriteClient级别,都可以将其重写为扩展HoodieRecordPayload
类的自定义类。
Spark数据源配置
可以通过将以下选项传递到option(k,v)
方法中来配置使用数据源的Spark作业。
实际的数据源级别配置在下面列出。
写选项
另外,您可以使用options()
或option(k,v)
方法直接传递任何WriteClient级别的配置。
inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) // 任何Hudi客户端选项都可以传入
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);
用于通过write.format.option(...)
写入数据集的选项
TABLE_NAME_OPT_KEY
属性:hoodie.datasource.write.table.name
[必须]
Hive表名,用于将数 据集注册到其中。
OPERATION_OPT_KEY
属性:hoodie.datasource.write.operation
, 默认值:upsert
是否为写操作进行插入更新、插入或批量插入。使用bulkinsert
将新数据加载到表中,之后使用upsert
或insert
。
批量插入使用基于磁盘的写入路径来扩展以加载大量输入,而无需对其进行缓存。
STORAGE_TYPE_OPT_KEY
属性:hoodie.datasource.write.storage.type
, 默认值:COPY_ON_WRITE
此写入的基础数据的存储类型。两次写入之间不能改变。
PRECOMBINE_FIELD_OPT_KEY
属性:hoodie.datasource.write.precombine.field
, 默认值:ts
实际写入之前在preCombining中使用的字段。
当两个记录具有相同的键值时,我们将使用Object.compareTo(..)从precombine字段中选择一个值最大的记录。
PAYLOAD_CLASS_OPT_KEY
属性:hoodie.datasource.write.payload.class
, 默认值:org.apache.hudi.OverwriteWithLatestAvroPayload
使用的有效载荷类。如果您想在插入更新或插入时使用自己的合并逻辑,请重写此方法。
这将使得PRECOMBINE_FIELD_OPT_VAL
设置的任何值无效
RECORDKEY_FIELD_OPT_KEY
属性:hoodie.datasource.write.recordkey.field
, 默认值:uuid
记录键字段。用作HoodieKey
中recordKey
部分的值。
实际值将通过在字段值上调用.toString()来获得。可以使用点符号指定嵌套字段,例如:a.b.c
PARTITIONPATH_FIELD_OPT_KEY
属性:hoodie.datasource.write.partitionpath.field
, 默认值:partitionpath
分区路径字段。用作HoodieKey
中partitionPath
部分的值。
通过调用.toString()获得实际的值
HIVE_STYLE_PARTITIONING_OPT_KEY
属性:hoodie.datasource.write.hive_style_partitioning
, 默认值:false
如果设置为true,则生成基于Hive格式的partition目录:<= END_INSTANTTIME`写入的新数据。
WriteClient 配置
直接使用RDD级别api进行编程的Jobs可以构建一个HoodieWriteConfig
对象,并 将其传递给HoodieWriteClient
构造函数。
HoodieWriteConfig可以使用以下构建器模式构建。
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.forTable(tableName)
.withSchema(schemaStr)
.withProps(props) // 从属性文件传递原始k、v对。
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withXXX(...).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withXXX(...).build())
...
.build();
以下各节介绍了写配置的不同方面,并解释了最重要的配置及其属性名称和默认值。
withPath(hoodie_base_path)
属性:hoodie.base.path
[必须]
创建所有数据分区所依据的基本DFS路径。
始终在前缀中明确指明存储方式(例如hdfs://,s3://等)。
Hudi将有关提交、保存点、清理审核日志等的所有主要元数据存储在基本目录下的.hoodie目录中。
withSchema(schema_str)
属性:hoodie.avro.schema
[必须]
这是数据集的当前读取器的avro模式(schema)。
这是整个模式的字符串。HoodieWriteClient使用此模式传递到HoodieRecordPayload的实现,以从源格式转换为avro记录。
在更新过程中重写记录时也使用此模式。
forTable(table_name)
属性:hoodie.table.name
[必须]
数据集的表名,将用于在Hive中注册。每次运行需要相同。
withBulkInsertParallelism(bulk_insert_parallelism = 1500)
属性:hoodie.bulkinsert.shuffle.parallelism
批量插入旨在用于较大的初始导入,而此处的并行度决定了数据集中文件的初始数量。
调整此值以达到在初始导入期间所需的最佳尺寸。
withParallelism(insert_shuffle_parallelism = 1500, upsert_shuffle_parallelism = 1500)
属性:hoodie.insert.shuffle.parallelism
, hoodie.upsert.shuffle.parallelism
最初导入数据后,此并行度将控制用于读取输入记录的初始并行度。
确保此值足够高,例如:1个分区用于1 GB的输入数据
combineInput(on_insert = false, on_update=true)
属性:hoodie.combine.before.insert
, hoodie.combine.before.upsert
在DFS中插入或更新之前先组合输入RDD并将多个部分记录合并为单个记录的标志
withWriteStatusStorageLevel(level = MEMORY_AND_DISK_SER)
属性:hoodie.write.status.storage.level
HoodieWriteClient.insert和HoodieWriteClient.upsert返回一个持久的RDD[WriteStatus],
这是因为客户端可以选择检查WriteStatus并根据失败选择是否提交。这是此RDD的存储级别的配置
withAutoCommit(autoCommit = true)
属性:hoodie.auto.commit
插入和插入更新后,HoodieWriteClient是否应该自动提交。
客户端可以选择关闭自动提交,并在"定义的成功条件"下提交
withAssumeDatePartitioning(assumeDatePartitioning = false)
属性:hoodie.assume.date.partitioning
HoodieWriteClient是否应该假设数据按日期划分,即从基本路径划分为三个级别。
这是支持 < 0.3.1版本创建的表的一个补丁。最终将被删除
withConsistencyCheckEnabled(enabled = false)
属性:hoodie.consistency.check.enabled
HoodieWriteClient是否应该执行其他检查,以确保写入的文件在基础文件系统/存储上可列出。
将其设置为true可以解决S3的最终一致性模型,并确保作为提交的一部分写入的所有数据均能准确地用于查询。
索引配置
以下配置控制索引行为,该行为将传入记录标记为对较旧记录的插入或更新。
withIndexConfig (HoodieIndexConfig)
可插入以具有外部索引(HBase)或使用存储在Parquet文件中的默认布隆过滤器(bloom filter)
withIndexClass(indexClass = "x.y.z.UserDefinedIndex")
属性:hoodie.index.class
用户自定义索引的全路径名,索引类必须为HoodieIndex的子类,当指定该配置时,其会优先于hoodie.index.type
配置
withIndexType(indexType = BLOOM)
属性:hoodie.index.type
要使用的索引类型。默认为布隆过滤器。可能的选项是[BLOOM | HBASE | INMEMORY]。
布隆过滤器消除了对外部系统的依赖,并存储在Parquet数据文件的页脚中
bloomFilterNumEntries(numEntries = 60000)
属性:hoodie.index.bloom.num_entries
仅在索引类型为BLOOM时适用。
这是要存储在布隆过滤器中的条目数。
我们假设maxParquetFileSize为128MB,averageRecordSize为1024B,因此,一个文件中的记录总数约为130K。
默认值(60000)大约是此近似值的一半。HUDI-56
描述了如何动态地对此进行计算。
警告:将此值设置得太低,将产生很多误报,并且索引查找将必须扫描比其所需的更多的文件;如果将其设置得非常高,将线性增加每个数据文件的大小(每50000个条目大约4KB)。
bloomFilterFPP(fpp = 0.000000001)
属性:hoodie.index.bloom.fpp
仅在索引类型为BLOOM时适用。
根据条目数允许的错误率。
这用于计算应为布隆过滤器分配多少位以及哈希函数的数量。通常将此值设置得很低(默认值:0.000000001),我们希望在磁盘空间上进行权衡以降低误报率
bloomIndexPruneByRanges(pruneRanges = true)
属性:hoodie.bloom.index.prune.by.ranges
仅在索引类型为BLOOM时适用。
为true时,从文件框定信息,可以加快索引查找的速度。 如果键具有单调递增的前缀,例如时间戳,则特别有用。
bloomIndexUseCaching(useCaching = true)
属性:hoodie.bloom.index.use.caching
仅在索引类型为BLOOM时适用。
为true时,将通过减少用于计算并行度或受影响分区的IO来缓存输入的RDD以加快 索引查找
bloomIndexTreebasedFilter(useTreeFilter = true)
属性:hoodie.bloom.index.use.treebased.filter
仅在索引类型为BLOOM时适用。
为true时,启用基于间隔树的文件过滤优化。与暴力模式相比,此模式可根据键范围加快文件过滤速度
bloomIndexBucketizedChecking(bucketizedChecking = true)
属性:hoodie.bloom.index.bucketized.checking
仅在索引类型为BLOOM时适用。
为true时,启用了桶式布隆过滤。这减少了在基于排序的布隆索引查找中看到的偏差
bloomIndexKeysPerBucket(keysPerBucket = 10000000)
属性:hoodie.bloom.index.keys.per.bucket
仅在启用bloomIndexBucketizedChecking并且索引类型为bloom的情况下适用。
此配置控制“存储桶”的大小,该大小可跟踪对单个文件进行的记录键检查的次数,并且是分配给执行布隆过滤器查找的每个分区的工作单位。
较高的值将分摊将布隆过滤器读取到内存的固定成本。
bloomIndexParallelism(0)
属性:hoodie.bloom.index.parallelism
仅在索引类型为BLOOM时适用。
这是索引查找的并行度,其中涉及Spark Shuffle。 默认情况下,这是根据输入的工作负载特征自动计算的
hbaseZkQuorum(zkString) [必须]
属性:hoodie.index.hbase.zkquorum
仅在索引类型为HBASE时适用。要连接的HBase ZK Quorum URL。
hbaseZkPort(port) [必须]
属性:hoodie.index.hbase.zkport
仅在索引类型为HBASE时适用。要连接的HBase ZK Quorum端口。
hbaseZkZnodeParent(zkZnodeParent) [必须]
属性:hoodie.index.hbase.zknode.path
仅在索引类型为HBASE时适用。这是根znode,它将包含HBase创建及使用的所有znode。
hbaseTableName(tableName) [必须]
属性:hoodie.index.hbase.table
仅在索引类型为HBASE时适用。HBase表名称,用作索引。Hudi将row_key和[partition_path, fileID, commitTime]映射存储在表中。
bloomIndexUpdatePartitionPath(updatePartitionPath = false)
属性:hoodie.bloom.index.update.partition.path
仅在索引类型为GLOBAL_BLOOM时适用。
为true时,当对一个已有记录执行包含分区路径的更新操作时,将会导致把新记录插入到新分区,而把原有记录从旧分区里删除。为false时,只对旧分区的原有记录进行更新。
存储选项
控制有关调整parquet和日志文件大小的方面。
withStorageConfig (HoodieStorageConfig)
limitFileSize (size = 120MB)
属性:hoodie.parquet.max.file.size
Hudi写阶段生成的parquet文件的目标大小。对于DFS,这需要与基础文件系统块大小保持一致,以实现最佳性能。
parquetBlockSize(rowgroupsize = 120MB)
属性:hoodie.parquet.block.size
Parquet行组大小。最好与文件大小相同,以便将文件中的单个列连续存储在磁盘上
parquetPageSize(pagesize = 1MB)
属性:hoodie.parquet.page.size
Parquet页面大小。页面是parquet文件中的读取单位。 在一个块内,页面被分别压缩。
parquetCompressionRatio(parquetCompressionRatio = 0.1)
属性:hoodie.parquet.compression.ratio
当Hudi尝试调整新parquet文件的大小时,预期对parquet数据进行压缩的比例。
如果bulk_insert生成的文件小于预期大小,请增加此值
parquetCompressionCodec(parquetCompressionCodec = gzip)
属性:hoodie.parquet.compression.codec
Parquet压缩编解码方式名称。默认值为gzip。可能的选项是[gzip | snappy | uncompressed | lzo]
logFileMaxSize(logFileSize = 1GB)
属性:hoodie.logfile.max.size
LogFile的最大大小。这是在将日志文件移到下一个版本之前允许的最大大小。
logFileDataBlockMaxSize(dataBlockSize = 256MB)
属性:hoodie.logfile.data.block.max.size
LogFile数据块的最大大小。这是允许将单个数据块附加到日志文件的最大大小。
这有助于确保附加到日志文件的数据被分解为可调整大小的块,以防止发生OOM错误。此大小应大于JVM内存。
logFileToParquetCompressionRatio(logFileToParquetCompressionRatio = 0.35)
属性:hoodie.logfile.to.parquet.compression.ratio
随着记录从日志文件移动到parquet,预期会进行额外压缩的比例。
用于merge_on_read存储,以将插入内容发送到日志文件中并控制压缩parquet文件的大小。
parquetCompressionCodec(parquetCompressionCodec = gzip)
属性:hoodie.parquet.compression.codec
Parquet文件的压缩编解码方式
压缩配置
压缩配置用于控制压缩(将日志文件合并到新的parquet基本文件中)、清理(回收较旧及未使用的文件组)。
withCompactionConfig (HoodieCompactionConfig)
withCleanerPolicy(policy = KEEP_LATEST_COMMITS)
属性:hoodie.cleaner.policy
要使用的清理政策。Hudi将删除旧版本的parquet文件以回收空间。
任何引用此版本文件的查询和计算都将失败。最好确保数据保留的时间超过最大查询执行时间。
retainCommits(no_of_commits_to_retain = 24)
属性:hoodie.cleaner.commits.retained
保留的提交数。因此,数据将保留为num_of_commits * time_between_commits(计划的)。
这也直接转化为您可以逐步提取此数据集的数量
archiveCommitsWith(minCommits = 96, maxCommits = 128)
属性:hoodie.keep.min.commits
, hoodie.keep.max.commits
每个提交都是.hoodie
目录中的一个小文件。由于DFS通常不支持大量小文件,因此Hudi将较早的提交归档到顺序日志中。
提交通过重命名提交文件以原子方式发布。
withCommitsArchivalBatchSize(batch = 10)
属性:hoodie.commits.archival.batch