Skip to main content
Version: 0.9.0

Flink 指南

本指南提供了使用 Flink SQL 操作 Hudi 的文档。阅读本指南,您可以学习如何快速开始使用 Flink 读写 Hudi,同时对配置和任务优化有更深入的了解:

快速开始

安装

我们推荐使用 Flink Sql Client 来读写 Hudi,因为 Flink sql client 对于 SQL 用户来说更容易上手。

我们推荐使用 Flink-1.12.x 来读写 Hudi。 你可以按照 Flink 安装文档 的指导来安装 Flink。 hudi-flink-bundle.jar 使用的是 scala 2.11,所以我们推荐 Flink-1.12.x 配合 scala 2.11 来使用。

在 Hadoop 环境下启动 standalone 的 Flink 集群。 在你启动 Flink 集群前,我们推荐先配置如下参数:

  • $FLINK_HOME/conf/flink-conf.yaml 中添加配置:taskmanager.numberOfTaskSlots: 4
  • $FLINK_HOME/conf/flink-conf.yaml 中,根据数据量大小和集群大小来添加其他的 Flink 配置
  • $FLINK_HOME/conf/workers 中添加4核 localhost 来保证我们本地集群中有4个 workers

启动集群:

# HADOOP_HOME 是 Hadoop 的根目录。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# 启动 Flink standalone 集群
./bin/start-cluster.sh

Hudi 将 Flink 板块单独打包为 hudi-flink-bundle.jar,该 Jar 包需要在启动的时候加载。 你可以在 hudi-source-dir/packaging/hudi-flink-bundle 下手动的打包这个 Jar 包,或者从 Apache Official Repository 中下载。

启动 Flink SQL Client:

# HADOOP_HOME 是 Hadoop 的根目录。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/sql-client.sh embedded -j .../hudi-flink-bundle_2.1?-*.*.*.jar shell

小提示:

  • 为了兼容大部分的对象存储,我们推荐使用 Hadoop 2.9 x+的 Hadoop 版本
  • flink-parquet 和 flink-avro 格式已经打包在 hudi-flink-bundle.jar 中了

根据下面的不同功能来设置表名,存储路径和操作类型。 Flink SQL Client 是逐行执行 SQL 的。

插入数据

先创建一个 Flink Hudi 表,然后在通过下面的 VALUES 语句往该表中插入数据。

-- 为了更直观的显示结果,推荐把 CLI 的输出结果模式设置为 tableau。
set execution.result-mode=tableau;

CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'schema://base-path',
'table.type' = 'MERGE_ON_READ' -- 创建一个 MERGE_ON_READ类型的表,默认是 COPY_ON_ERITE
);

-- 使用 values 语句插入数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

查询数据

-- 从 Hudi 表中查询
select * from t1;

该查询语句提供的是 快照读(Snapshot Querying)。 如果想了解更多关于表类型和查询类型的介绍,可以参考文档 表类型和查询类型

更新数据

数据的更新和插入数据类似:

-- 这条语句会更新 key 为 'id1' 的记录
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

需要注意的是:现在使用的存储类型为 Append。通常我们都是使用 apennd 模式,除非你是第一次创建这个表。 再次 查询数据 就会显示更新后的结果。 每一次的插入操作都会在时间轴上生成一个带时间戳的新的 commit,在元数据字段 _hoodie_commit_time 和同一 _hoodie_record_keyage字段中查看更新。

流式查询

Hudi Flink 也有能力来查询从指定时间戳开始的流式记录集合。 该功能可以使用Hudi的流式查询,只需要提供一个查询开始的时间戳就可以完成查询。如果我们需要的 是指定时间戳后的所有数据,我们就不需要指定结束时间。

CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'oss://vvr-daily/hudi/t1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- 该参数开启流式查询
'read.streaming.start-commit' = '20210316134557' -- 指定开始的时间戳
'read.streaming.check-interval' = '4' -- 指定检查新的commit的周期,默认是60秒
);

-- 开启流式的查询
select * from t1;

上述的查询会查询出 read.streaming.start-commit 时间戳后的所有数据。该功能的特殊在于可以同时在流和批的 pipeline 上执行。

删除数据

流式查询 中使用数据时,Hudi Flink 源还可以接受来自底层数据源的更改日志,然后可以按行级别应用更新和删除。所以,你可以在 Hudi 上同步各种 RDBMS 的近实时快照。

在使用 Flink 前,你需要在 $FLINK_HOME/conf/flink-conf.yaml 中设置一些全局的 Flink 配置。

并行度

名称默认值类型描述
taskmanager.numberOfTaskSlots1Integer单个 TaskManager 可以运行的并行 task 数。我们建议将该值设置为 > 4,实际值需根据数据量进行设置
parallelism.default1Integer当用户为指定算子并行度时,会使用这个并行度(默认值是1)。例如 write.bucket_assign.tasks 没有设置,就会使用这个默认值

内存

名称默认值类型描述
jobmanager.memory.process.size(none)MemorySizeJobManager 的总进程内存大小。代表 JobManager JVM 进程消耗的所有内存,包括总的 Flink 内存、JVM 元空间和 JVM 开销
taskmanager.memory.task.heap.size(none)MemorySizeTaskExecutor 的堆内存大小。这是为写缓存保留的 JVM 堆内存大小
taskmanager.memory.managed.size(none)MemorySize这是内存管理器管理的堆外内存的大小,用于排序和 RocksDB 状态后端。如果选择 RocksDB 作为状态后端,则需要设置此内存

Checkpoint

名称默认值类型描述
execution.checkpointing.interval(none)Duration设置该值的方式为 execution.checkpointing.interval = 150000ms,其中150000ms = 2.5min。 设置这个参数等同于开启了 Checkpoint
state.backend(none)String保存状态信息的状态后端。 我们推荐设置状态后端为 rocksdbstate.backend: rocksdb
state.backend.rocksdb.localdir(none)StringRocksDB 存储状态信息的路径
state.checkpoints.dir(none)StringCheckpoint 的默认路径,用于在支持 Flink 的文件系统中存储检查点的数据文件和元数据。存储路径必须可从所有参与进程/节点(即所有 TaskManager 和 JobManager)访问,如 HDFS 和 OSS 路径
state.backend.incrementalfalseBoolean选项状态后端是否创建增量检查点。对于增量检查点,只存储与前一个检查点的差异,而不是完整的检查点状态。如果存储状态设置为 rocksdb,建议打开这个选项

表参数

对于单个作业的配置,我们可以在 Flink SQL 语句的 WITH中设置。 所以,作业级别的配置如下:

内存

note

我们在内存调优的时候需要先关注 TaskManager 的数量和 内存 配置,以及 write task 的并发(write.tasks: 4 的值),确认每个 write task 能够分配到足够的内存, 再考虑以下相关的内存参数设置。

名称说明默认值备注
write.task.max.size一个 write task 的最大可用内存。 默认为 1024MB1024D当前预留给 write buffer 的内存为 write.task.max.size - compaction.max_memory。 当 write task 的内存 buffer 达到该阈值后会将内存里最大的 buffer flush
write.batch.sizeFlink 的 write task 为了提高写数据效率,会按照写 bucket 提前缓存数据,每个 bucket 的数据在内存达到阈值之前会一直缓存在内存中,当阈值达到会把数据 buffer 传递给 Hudi 的 writer 执行写操作。默认为 64MB64D推荐使用默认值
write.log_block.sizeHudi 的 log writer 在收到 write task 的数据后不会马上 flush 数据,writer 是以 LogBlock 为单位往磁盘刷数据的,在 LogBlock 攒够之前 records 会以序列化字节的形式缓存在 writer 内部。默认为 128MB128推荐使用默认值
write.merge.max_memory当表的类型为 COPY_ON_WRITE,Hudi 会合并增量数据和 base file 中的数据。增量的数据会缓存在内存的 map 结构里,这个 map 是可溢写的,这个参数控制了 map 可以使用的堆内存大小。 默认为 100MB100推荐使用默认值
compaction.max_memorywrite.merge.max_memory 类似,只是发生在压缩时。默认为 100MB100如果是在线 compaction,资源充足时可以开大些,比如 1024MB

并行度

名称说明默认值备注
write.taskswriter 的并发,每个 writer 顺序写 1~N 个 buckets。默认为 44增加这个并发,对小文件个数没影响
write.bucket_assign.tasksbucket assigner 的并发,默认使用 Flink 默认并发 parallelism.defaultparallelism.default增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件(小 bucket)数
write.index_boostrap.tasksIndex bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 Checkpoint,因此需要设置多一些的 Checkpoint 失败容忍次数。 默认使用 Flink 默认并发 parallelism.defaultparallelism.default只在 index.bootsrap.enabledtrue 时生效
read.tasks读算子的并发(batch 和 stream。默认为 44
compaction.tasks在线 compaction 的并发。默认为 1010在线 compaction 比较耗费资源,建议使用 离线 compaction

Compaction

note

这些参数都只服务于在线 compaction。

note

通过设置compaction.async.enabled = false 来关闭在线 compaction,但是 compaction.schedule.enable 仍然建议开启。之后通过离线 compaction 直接执行在线 compaction 产生的 compaction plan。

名称说明默认值备注
compaction.schedule.enabled是否阶段性生成 compaction plantrue建议开启,即使 compaction.async.enabled = false
compaction.async.enabled是否开启异步压缩,MOR 表时默认开启true通过关闭此参数来关闭 在线 compaction
compaction.trigger.strategy压缩策略num_commits可选择的策略有 num_commits:达到 N 个 delta commits 时触发 compaction; time_elapsed:距离上次 compaction 超过 N 秒触发 compaction ; num_and_timeNUM_COMMITSTIME_ELAPSED 同时满足; num_or_timeNUM_COMMITS 或者 TIME_ELAPSED 中一个满足
compaction.delta_commits默认策略,5 个 delta commits 触发一次压缩5--
compaction.delta_seconds默认 1 小时触发一次压缩3600--
compaction.max_memorycompaction 的 hashMap 可用内存。 默认值为 100MB100资源够用的话,建议调整到 1024MB
compaction.target_io每个 compaction plan 的 IO 内存上限 (读和写)。 默认值为 5GB5120离线 compaction 的默认值是 500GB

内存优化

MOR

  1. 把 Flink 的状态后端设置为 rocksdb (默认的 in memory 状态后端非常的消耗内存)
  2. 如果内存足够,compaction.max_memory 可以设置得更大些(默认为 100MB,可以调大到 1024MB
  3. 关注 taskManager 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的内存大小。 比如 taskManager 的内存是 4GB, 运行了 2StreamWriteFunction,那每个 write function 能分到 2GB,尽量预留一些缓存。因为网络缓存,taskManager 上其他类型的 task (比如 BucketAssignFunction)也会消耗一些内存
  4. 需要关注 compaction 的内存变化。 compaction.max_memory 控制了每个 compaction task 读 log 时可以利用的内存大小。compaction.tasks 控制了 compaction task 的并发

COW

  1. 把 Flink 的状态后端设置为 rocksdb (默认的 in memory 状态后端非常的消耗内存)
  2. 同时调大 write.task.max.sizewrite.merge.max_memory (默认值分别是 1024MB100MB,可以调整为 2014MB1024MB
  3. 关注 taskManager 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的内存大小。 比如 taskManager 的内存是 4GB, 运行了 2StreamWriteFunction,那每个 write function 能分到 2GB,尽量预留一些缓存。因为网络缓存,taskManager 上其他类型的 task (比如 BucketAssignFunction)也会消耗一些内存

离线批量导入

针对存量数据导入的需求,如果存量数据来源于其他数据源,可以使用离线批量导入功能(bulk_insert),快速将存量数据导入 Hudi。

note

bulk_insert 省去了 avro 的序列化以及数据的 merge 过程,后续也不会再有去重操作。所以,数据的唯一性需要自己来保证。

note

bulk_insertbatch execution mode 模式下执行更加高效。 batch execution mode 模式默认会按照 partition path 排序输入消息再写入 Hudi, 避免 file handle 频繁切换导致性能下降。

note

bulk_insert 的 write tasks 的并发是通过参数 write.tasks 来指定,并发的数量会影响到小文件的数量,理论上,bulk_insert 的 write tasks 的并发数就是划分的 bucket 数, 当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会回滚到新的文件句柄,所以最后:写文件数量 >= write.bucket_assign.tasks

参数

名称Required默认值备注
write.operationtrueupsert开启 bulk_insert 功能
write.tasksfalse4bulk_insert write tasks 的并发,最后的文件数 >= write.bucket_assign.tasks
write.bulk_insert.shuffle_by_partitionfalsetrue是否将数据按照 partition 字段 shuffle 后,再通过 write task 写入,开启该参数将减少小文件的数量,但是有数据倾斜的风险
write.bulk_insert.sort_by_partitionfalsetrue是否将数据线按照 partition 字段排序后,再通过 write task 写入,当一个 write task 写多个 partition时,开启可以减少小文件数量
write.sort.memoryfalse128sort 算子的可用 managed memory(单位 MB)。默认为 128 MB

全量接增量

针对全量数据导入后,接增量的需求。如果已经有全量的离线 Hudi 表,需要接上实时写入,并且保证数据不重复,可以开启 全量接增量(index bootstrap)功能。

note

如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启 写入限流 )。

参数

名称Required默认值备注
index.bootstrap.enabledtruefalse开启 index bootstrap 索引加载后,会将已存在的 Hudi 表的数据一次性加载到 state 中
index.partition.regexfalse*设置正则表达式进行分区筛选,默认为加载全部分区

使用流程

  1. CREATE TABLE 创建和 Hudi 表对应的语句,注意 table.type 必须正确
  2. 设置 index.bootstrap.enabled = true 开启索引加载功能
  3. flink-conf.yaml 中设置 Checkpoint 失败容忍 :execution.checkpointing.tolerable-failed-checkpoints = n(取决于checkpoint 调度次数)
  4. 等待第一次 Checkpoint 完成,表示索引加载完成
  5. 索引加载完成后可以退出并保存 savepoint(也可以直接用 externalized checkpoint)
  6. 重启任务,将 index.bootstrap.enable 设置为 false,参数配置到合适的大小
note
  1. 索引加载是阻塞式,所以在索引加载过程中 Checkpoint 无法完成
  2. 索引加载由数据流触发,需要确保每个 partition 都至少有1条数据,即上游 source 有数据进来
  3. 索引加载为并发加载,根据数据量大小加载时间不同,可以在log中搜索 finish loading the index under partitionLoad record form file 日志内容来观察索引加载的进度
  4. 第一次 Checkpoint 成功就表示索引已经加载完成,后续从 Checkpoint 恢复时无需再次加载索引

Changelog 模式

针对使用 Hudi 保留消息的所有变更(I / -U / U / D),之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算)的需求,Hudi 的 MOR 表 通过行存原生支持保留消息的所有变更(format 层面的集成),通过流读 MOR 表可以消费到所有的变更记录。

参数

名称Required默认值备注
changelog.enabledfalsefalse默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被 merge 掉;改成 true 支持消费所有变更
note

批(快照)读仍然会合并所有的中间结果,不管 format 是否已存储中间状态。

note

设置 changelog.enabletrue 后,中间的变更也只是 best effort:异步的压缩任务会将中间变更合并成 1 条,所以如果流读消费不够及时,被压缩后 只能读到最后一条记录。当然,通过调整压缩的缓存时间可以预留一定的时间缓冲给 reader,比如调整压缩的两个参数:compaction.delta_commits and compaction.delta_seconds

Insert 模式

当前 Hudi 对于 Insert 模式 默认会采用小文件策略:MOR 会追加写 avro log 文件,COW 会不断合并之前的 parquet 文件(并且增量的数据会去重),这样会导致性能下降。

如果想关闭文件合并,可以设置 write.insert.deduplicatefalse。 关闭后,不会有任何的去重行为,每次 flush 都是直接写独立的 parquet(MOR 表也会直接写 parquet)。

参数

名称Required默认值备注
write.insert.deduplicatefalsetrue默认 Insert 模式 会去重,关闭后每次 flush 都会直接写独立的 parquet

Hive 查询

打包

第一步是打包 hudi-flink-bundle_2.11-0.9.0.jarhudi-flink-bundle module pom.xml 默认将 Hive 相关的依赖 scope 设置为 provided, 如果想打入 Hive 的依赖,需要显示指定 Profile 为 flink-bundle-shade-hive。执行以下命令打入 Hive 依赖:

# Maven 打包命令
mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2

# 如果是 hive3 需要使用 profile -Pflink-bundle-shade-hive3
# 如果是 hive1 需要使用 profile -Pflink-bundle-shade-hive1
note

Hive1.x 现在只能实现同步 metadata 到 Hive,而无法使用 Hive 查询,如需查询可使用 Spark 查询 Hive 外表的方法查询。

note

使用 -Pflink-bundle-shade-hive x,需要修改 Profile 中 Hive 的版本为集群对应版本(只需修改 Profile 里的 Hive 版本)。修改位置为 packaging/hudi-flink-bundle/pom.xml 最下面的对应 Profile 段,找到后修改 Profile 中的 Hive 版本为对应版本即可。

Hive 环境准备

  1. 第一步是将 hudi-hadoop-mr-bundle.jar 放到 Hive中。 在 Hive 的根目录下创建 auxlib/ 文件夹,把 hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar 移入到 auxlibhudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar 可以在 packaging/hudi-hadoop-mr-bundle/target 目录下拷贝。

  2. 第二步是开启 Hive 相关的服务。Flink SQL Client 远程连接 Hive 的时候,要求 Hive 的 hive metastore and hiveserver2 两个服务都开启且需要记住端口号。 服务开启的方法如下:

# 启动 Hive metastore 和 hiveserver2
nohup ./bin/hive --service metastore &
nohup ./bin/hive --service hiveserver2 &

# 每次更新了 /auxlib 下的 jar 包都需要重启上述两个服务

Hive 配置模版

Flink hive sync 现在支持两种 hive sync mode,分别是 hmsjdbc。其中 hms 模式只需要配置 Metastore uris;而 jdbc 模式需要同时 配置 JDBC 属性 和 Metastore uris,具体配置模版如下:

-- hms mode 配置模版
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'oss://vvr-daily/hudi/t1',
'table.type' = 'COPY_ON_WRITE', --MERGE_ON_READ 方式在没生成 parquet 文件前,Hive 不会有输出
'hive_sync.enable' = 'true', -- Required。开启 Hive 同步功能
'hive_sync.mode' = 'hms' -- Required。将 hive sync mode 设置为 hms, 默认 jdbc
'hive_sync.metastore.uris' = 'thrift://ip:9083' -- Required。metastore 的端口
);


-- jdbc mode 配置模版
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'oss://vvr-daily/hudi/t1',
'table.type' = 'COPY_ON_WRITE', --MERGE_ON_READ 方式在没生成 parquet 文件前,Hive 不会有输出
'hive_sync.enable' = 'true', -- Required。开启 Hive 同步功能
'hive_sync.mode' = 'hms' -- Required。将 hive sync mode 设置为 hms, 默认 jdbc
'hive_sync.metastore.uris' = 'thrift://ip:9083' -- Required。metastore 的端口
'hive_sync.jdbc_url'='jdbc:hive2://ip:10000', -- required。hiveServer 的端口
'hive_sync.table'='t1', -- required。hive 新建的表名
'hive_sync.db'='testDB', -- required。hive 新建的数据库名
'hive_sync.username'='root', -- required。HMS 用户名
'hive_sync.password'='your password' -- required。HMS 密码
);

Hive 查询

使用 beeline 查询时,需要手动设置:

set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

与其他包冲突

当 Flink lib/ 下有 flink-sql-connector-hive-xxx.jar 时,会出现hive包冲突,解决方法是在 Install 时,另外再指定一个 Profile:-Pinclude-flink-sql-connector-hive, 同时删除 Flink lib/ 下的 flink-sql-connector-hive-xxx.jar,新的 Install 的命令如下:

# Maven 打包命令
mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2 -Pinclude-flink-sql-connector-hive

Presto 查询

Hive 同步

首先,参考 Hive 查询 过程,完成 Hive 元数据的同步。

Presto 环境配置

  1. 根据 Presto 配置文档 来配置 Presto。
  2. 根据下面的配置,在 /presto-server-0.2xxx/etc/catalog/hive.properties 中配置 Hive catalog:
connector.name=hive-hadoop2
hive.metastore.uri=thrift://xxx.xxx.xxx.xxx:9083
hive.config.resources=.../hadoop-2.x/etc/hadoop/core-site.xml,.../hadoop-2.x/etc/hadoop/hdfs-site.xml

Presto 查询

通过 presto-cli 连接 Hive metastore 开启查询。 presto-cli 的设置参考 presto-cli 配置:

# 连接 presto server 的命令
./presto --server xxx.xxx.xxx.xxx:9999 --catalog hive --schema default
note
  1. Presto-server-0.2445 版本较低,在查 MOR 表的 rt 表时,会出现包冲突,正在解决中
  2. Presto-server-xxx 的版本 < 0.233 时,hudi-presto-bundle.jar 需要手动导入到 {presto_install_dir}/plugin/hive-hadoop2/ 中。

离线 Compaction

MERGE_ON_READ 表的 compaction 默认是打开的,策略是 5 个 delta commits 执行一次压缩。因为压缩操作比较耗费内存,和写流程放在同一个 pipeline,在数据量比较大 的时候(10w+/s),容易干扰写流程,此时采用离线定时任务的方式执行 compaction 任务更稳定。

note

一个 compaction 任务的执行包括两部分:1. 生成 compaction plan; 2. 执行对应的 compaction plan。其中,第一步生成 compaction plan 的过程推荐由写任务定时触发, 写参数 compaction.schedule.enable 默认开启。

离线 compaction 需要手动执行 Flink 任务,程序入口为: hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jarorg.apache.hudi.sink.compact.HoodieFlinkCompactor

# 命令行执行方式
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink-bundle_2.11-0.9.0.jar --path hdfs://xxx:9000/table

参数

名称Required默认值备注
--pathfrue--目标 Hudi 表的路径
--compaction-max-memoryfalse100压缩时 log 数据的索引 HashMap 的内存大小,默认 100MB,内存足够时,可以调大
--schedulefalsefalse是否要执行 schedule compaction 的操作,当写流程还在持续写入表数据的时候,开启这个参数有丢失查询数据的风险,所以开启该参数一定要保证当前没有任务往表里写数据,写任务的 compaction plan 默认是一直 schedule 的,除非手动关闭(默认 5 个 delta commits 一次 compaction)
--seqfalseLIFO执行压缩任务的顺序,默认是从最新的 compaction plan 开始执行,可选值:LIFO :从最新的 plan 开始执行; FIFO:从最老的 plan 开始执行

写入限流

针对将全量数据(百亿数量级)和增量先同步到 Kafka,再通过 Flink 流式消费的方式将库表数据直接导成 Hudi 表的需求,因为直接消费全量数据:量大 (吞吐高)、乱序严重(写入的 partition 随机),会导致写入性能退化,出现吞吐毛刺等情况,这时候可以开启限速参数,保证流量平稳写入。

参数

名称Required默认值备注
write.rate.limitfalse0默认关闭限速

从这开始下一步?

您也可以通过自己构建hudi来快速开始, 并在spark-shell命令中使用--jars <path to hudi_code>/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.1?-*.*.*-SNAPSHOT.jar, 而不是--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0

这里我们使用Spark演示了Hudi的功能。但是,Hudi可以支持多种存储类型/视图,并且可以从Hive,Spark,Presto等查询引擎中查询Hudi数据集。 我们制作了一个基于Docker设置、所有依赖系统都在本地运行的演示视频, 我们建议您复制相同的设置然后按照这里的步骤自己运行这个演示。 另外,如果您正在寻找将现有数据迁移到Hudi的方法,请参考迁移指南