写入 Hudi 数据集
这一节我们将介绍使用DeltaStreamer工具从外部源甚至其他Hudi数据集摄取新更改的方法, 以及通过使用Hudi数据源的upserts加快大型Spark作业的方法。 对于此类数据集,我们可以使用各种查询引擎查询它们。
写 操作
在此之前,了解Hudi数据源及delta streamer工具提供的三种不同的写操作以及如何最佳利用它们可能会有所帮助。 这些操作可以在针对数据集发出的每个提交/增量提交中进行选择/更改。
- UPSERT(插入更新) :这是默认操作,在该操作中,通过查找索引,首先将输入记录标记为插入或更新。 在运行启发式方法以确定如何最好地将这些记录放到存储上,如优化文件大小之后,这些记录最终会被写入。 对于诸如数据库更改捕获之类的用例,建议该操作,因为输入几乎肯定包含更新。
- INSERT(插入) :就使用启发式方法确定文件大小而言,此操作与插入更新(UPSERT)非常相似,但此操作完全跳过了索引查找步骤。 因此,对于日志重复数据删除等用例(结合下面提到的过滤重复项的选项),它可以比插入更新快得多。 插入也适用于这种用例,这种情况数据集可以允许重复项,但只需要Hudi的事务写/增量提取/存储管理功能。
- BULK_INSERT(批插入) :插入更新和插入操作都将输入记录保存在内存中,以加快存储优化启发式计算的速度(以及其它未提及的方面)。 所以对Hudi数据集进行初始加载/引导时这两种操作会很低效。批量插入提供与插入相同的语义,但同时实现了基于排序的数据写入算法, 该算法可以很好地扩展数百TB的初始负载。但是,相比于插入和插入更新能保证文件大小,批插入在调整文件大小上只能尽力而为。
DeltaStreamer
HoodieDeltaStreamer
实用工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能。
- 从Kafka单次摄取新事件,从Sqoop、HiveIncrementalPuller输出或DFS文件夹中的多个文件 增量导入
- 支持json、avro或自定义记录类型的传入数据
- 管理检查点,回滚和恢复
- 利用DFS或Confluent schema注册表的Avro模式。
- 支持自定义转换操作
命令行选项更详细地描述了这些功能:
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` --help
Usage: <main class> [options]
Options:
--commit-on-errors
Commit even when some records failed to be written
Default: false
--enable-hive-sync
Enable syncing to hive
Default: false
--filter-dupes
Should duplicate records from source be dropped/filtered outbefore
insert/bulk-insert
Default: false
--help, -h
--hudi-conf
Any configuration that can be set in the properties file (using the CLI
parameter "--propsFilePath") can also be passed command line using this
parameter
Default: []
--op
Takes one of these values : UPSERT (default), INSERT (use when input is
purely new data/inserts to gain speed)
Default: UPSERT
Possible Values: [UPSERT, INSERT, BULK_INSERT]
--payload-class
subclass of HoodieRecordPayload, that works off a GenericRecord.
Implement your own, if you want to do something other than overwriting
existing value
Default: org.apache.hudi.OverwriteWithLatestAvroPayload
--props
path to properties file on localfs or dfs, with configurations for
Hudi client, schema provider, key generator and data source. For
Hudi client props, sane defaults are used, but recommend use to
provide basic things like metrics endpoints, hive configs etc. For
sources, referto individual classes, for supported properties.
Default: file:///Users/vinoth/bin/hoodie/src/test/resources/delta-streamer-config/dfs-source.properties
--schemaprovider-class
subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach
schemas to input & target table data, built in options:
FilebasedSchemaProvider
Default: org.apache.hudi.utilities.schema.FilebasedSchemaProvider
--source-class
Subclass of org.apache.hudi.utilities.sources to read data. Built-in
options: org.apache.hudi.utilities.sources.{JsonDFSSource (default),
AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}
Default: org.apache.hudi.utilities.sources.JsonDFSSource
--source-limit
Maximum amount of data to read from source. Default: No limit For e.g:
DFSSource => max bytes to read, KafkaSource => max events to read
Default: 9223372036854775807
--source-ordering-field
Field within source record to decide how to break ties between records
with same key in input data. Default: 'ts' holding unix timestamp of
record
Default: ts
--spark-master
spark master to use.
Default: local[2]
* --target-base-path
base path for the target Hudi dataset. (Will be created if did not
exist first time around. If exists, expected to be a Hudi dataset)
* --target-table
name of the target table in Hive
--transformer-class
subclass of org.apache.hudi.utilities.transform.Transformer. UDF to
transform raw source dataset to a target dataset (conforming to target
schema) before writing. Default : Not set. E:g -
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which
allows a SQL query template to be passed as a transformation function)
该工具采用层次结构组成的属性文件,并具有可插拔的接口,用于提取数据、生成密钥和提供模式。
从Kafka和DFS摄取数据的示例配置在这里:hudi-utilities/src/test/resources/delta-streamer-config
。
例如:当您让Confluent Kafka、Schema注册表启动并运行后,可以用这个命令产生一些测试数据 (impressions.avro, 由schema-registry代码库提供)
[confluent-5.0.0]$ bin/ksql-datagen schema=../impressions.avro format=avro topic=impressions key=impressionid
然后用如下命令摄取这些数据。
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
--props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--target-base-path file:///tmp/hudi-deltastreamer-op --target-table uber.impressions \
--op BULK_INSERT
在某些情况下,您可能需要预先将现有数据集迁移到Hudi。 请参考迁移指南。
Datasource Writer
hudi-spark
模块提供了DataSource API,可以将任何DataFrame写入(也可以读取)到Hudi数据集中。
以下是在指定需要使用的字段名称的之后,如何插入更新DataFrame的方法,这些字段包括
recordKey => _row_key
、partitionPath => partition
和precombineKey => timestamp
inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) // 可以传入任何Hudi客户端参数
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);