Streaming Ingestion
Hudi Streamer
The following classes were renamed and relocated to org.apache.hudi.utilities.streamer package.
DeltastreamerMultiWriterCkptUpdateFuncis renamed toStreamerMultiWriterCkptUpdateFuncDeltaSyncis renamed toStreamSyncHoodieDeltaStreameris renamed toHoodieStreamerHoodieDeltaStreamerMetricsis renamed toHoodieStreamerMetricsHoodieMultiTableDeltaStreameris renamed toHoodieMultiTableStreamer
To maintain backward compatiblity, the original classes are still present in the org.apache.hudi.utilities.deltastreamer package, but have been deprecated.
The HoodieStreamer utility (part of hudi-utilities-bundle) provides the way to ingest from different sources such as DFS or Kafka, with the following capabilities.
- Exactly once ingestion of new events from Kafka, incremental imports from Sqoop or output of
HiveIncrementalPulleror files under a DFS folder - Support json, avro or a custom record types for the incoming data
- Manage checkpoints, rollback & recovery
- Leverage Avro schemas from DFS or Confluent schema registry.
- Support for plugging in transformations
Command line options describe capabilities in more detail
[hoodie]$ spark-submit --class org.apache.hudi.utilities.streamer.HoodieStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` --help
Usage: <main class> [options]
Options:
--checkpoint
Resume Hudi Streamer from this checkpoint.
--commit-on-errors
Commit even when some records failed to be written
Default: false
--compact-scheduling-minshare
Minshare for compaction as defined in
https://spark.apache.org/docs/latest/job-scheduling
Default: 0
--compact-scheduling-weight
Scheduling weight for compaction as defined in
https://spark.apache.org/docs/latest/job-scheduling
Default: 1
--continuous
Hudi Streamer runs in continuous mode running source-fetch -> Transform
-> Hudi Write in loop
Default: false
--delta-sync-scheduling-minshare
Minshare for delta sync as defined in
https://spark.apache.org/docs/latest/job-scheduling
Default: 0
--delta-sync-scheduling-weight
Scheduling weight for delta sync as defined in
https://spark.apache.org/docs/latest/job-scheduling
Default: 1
--disable-compaction
Compaction is enabled for MoR table by default. This flag disables it
Default: false
--enable-hive-sync
Enable syncing to hive (Deprecated in favor of --enable-sync and --sync-tool-classes)
Default: false
--enable-sync
Enable syncing meta
Default: false
--sync-tool-classes
Classes (comma-separated) to be used for syncing meta. Shall be used only when --enable-sync or --enable-hive-sync is set to true
Note: When used with deprecated --enable-hive-sync flag, HiveSyncTool will always be run along with any other classes mentioned in here.
Default: org.apache.hudi.hive.HiveSyncTool
--filter-dupes
Should duplicate records from source be dropped/filtered out before
insert/bulk-insert
Default: false
--help, -h
--hoodie-conf
Any configuration that can be set in the properties file (using the CLI
parameter "--propsFilePath") can also be passed command line using this
parameter
Default: []
--max-pending-compactions
Maximum number of outstanding inflight/requested compactions. Delta Sync
will not happen unlessoutstanding compactions is less than this number
Default: 5
--min-sync-interval-seconds
the min sync interval of each sync in continuous mode
Default: 0
--op
Takes one of these values : UPSERT (default), INSERT (use when input is
purely new data/inserts to gain speed)
Default: UPSERT
Possible Values: [UPSERT, INSERT, BULK_INSERT]
--payload-class
subclass of HoodieRecordPayload, that works off a GenericRecord.
Implement your own, if you want to do something other than overwriting
existing value
Default: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
--props
path to properties file on localfs or dfs, with configurations for
hoodie client, schema provider, key generator and data source. For
hoodie client props, sane defaults are used, but recommend use to
provide basic things like metrics endpoints, hive configs etc. For
sources, referto individual classes, for supported properties.
Default: file:///Users/vinoth/bin/hoodie/src/test/resources/streamer-config/dfs-source.properties
--schemaprovider-class
subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach
schemas to input & target table data, built in options:
org.apache.hudi.utilities.schema.FilebasedSchemaProvider.Source (See
org.apache.hudi.utilities.sources.Source) implementation can implement
their own SchemaProvider. For Sources that return Dataset<Row>, the
schema is obtained implicitly. However, this CLI option allows
overriding the schemaprovider returned by Source.
--source-class
Subclass of org.apache.hudi.utilities.sources to read data. Built-in
options: org.apache.hudi.utilities.sources.{JsonDFSSource (default),
AvroDFSSource, AvroKafkaSource, CsvDFSSource, HiveIncrPullSource,
JdbcSource, JsonKafkaSource, ORCDFSSource, ParquetDFSSource,
S3EventsHoodieIncrSource, S3EventsSource, SqlSource}
Default: org.apache.hudi.utilities.sources.JsonDFSSource
--source-limit
Maximum amount of data to read from source. Default: No limit For e.g:
DFS-Source => max bytes to read, Kafka-Source => max events to read
Default: 9223372036854775807
--source-ordering-field
Field within source record to decide how to break ties between records
with same key in input data. Default: 'ts' holding unix timestamp of
record
Default: ts
--spark-master
spark master to use.
Default: local[2]
* --table-type
Type of table. COPY_ON_WRITE (or) MERGE_ON_READ
* --target-base-path
base path for the target hoodie table. (Will be created if did not exist
first time around. If exists, expected to be a hoodie table)
* --target-table
name of the target table in Hive
--transformer-class
subclass of org.apache.hudi.utilities.transform.Transformer. Allows
transforming raw source Dataset to a target Dataset (conforming to
target schema) before writing. Default : Not set. E:g -
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which
allows a SQL query templated to be passed as a transformation function)
The tool takes a hierarchically composed property file and has pluggable interfaces for extracting data, key generation and providing schema. Sample configs for ingesting from kafka and dfs are
provided under hudi-utilities/src/test/resources/streamer-config.
For e.g: once you have Confluent Kafka, Schema registry up & running, produce some test data using (impressions.avro provided by schema-registry repo)
[confluent-5.0.0]$ bin/ksql-datagen schema=../impressions.avro format=avro topic=impressions key=impressionid
and then ingest it as follows.
[hoodie]$ spark-submit --class org.apache.hudi.utilities.streamer.HoodieStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
--props file://${PWD}/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--target-base-path file:\/\/\/tmp/hudi-streamer-op \
--target-table uber.impressions \
--op BULK_INSERT
In some cases, you may want to migrate your existing table into Hudi beforehand. Please refer to migration guide.
Note on hudi utilities bundle usage for different spark versions
From 0.11.0 release, we start to provide a new hudi-utilities-slim-bundle which aims to exclude dependencies that can
cause conflicts and compatibility issues with different versions of Spark. The hudi-utilities-slim-bundle should be
used along with a Hudi Spark bundle corresponding the Spark version used to make utilities work with Spark, e.g.,
--packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.13.0,org.apache.hudi:hudi-spark3.1-bundle_2.12:0.13.0,
if using hudi-utilities-bundle solely to run HoodieStreamer in Spark encounters compatibility issues.
MultiTableStreamer
HoodieMultiTableStreamer, a wrapper on top of HoodieStreamer, enables one to ingest multiple tables at a single go into hudi datasets. Currently it only supports sequential processing of tables to be ingested and COPY_ON_WRITE storage type. The command line options for HoodieMultiTableStreamer are pretty much similar to HoodieStreamer with the only exception that you are required to provide table wise configs in separate files in a dedicated config folder. The following command line options are introduced
* --config-folder
the path to the folder which contains all the table wise config files
--base-path-prefix
this is added to enable users to create all the hudi datasets for related tables under one path in FS. The datasets are then created under the path - <base_path_prefix>/<database>/<table_to_be_ingested>. However you can override the paths for every table by setting the property hoodie.streamer.ingestion.targetBasePath
The following properties are needed to be set properly to ingest data using HoodieMultiTableStreamer.
hoodie.streamer.ingestion.tablesToBeIngested
comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2
hoodie.streamer.ingestion.targetBasePath
if you wish to ingest a particular table in a separate path, you can mention that path here
hoodie.streamer.ingestion.<database>.<table>.configFile
path to the config file in dedicated config folder which contains table overridden properties for the particular table to be ingested.
Sample config files for table wise overridden properties can be found under hudi-utilities/src/test/resources/streamer-config. The command to run HoodieMultiTableStreamer is also similar to how you run HoodieStreamer.
[hoodie]$ spark-submit --class org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
--props file://${PWD}/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties \
--config-folder file://tmp/hudi-ingestion-config \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--base-path-prefix file:\/\/\/tmp/hudi-streamer-op \
--target-table uber.impressions \
--op BULK_INSERT
For detailed information on how to configure and use HoodieMultiTableStreamer, please refer blog section.