Flink 指南
本指南提供了使用 Flink SQL 操作 Hudi 的文档。阅读本指南,您可以学习如何快速开始使用 Flink 读写 Hudi,同时对配置和任务优化有更深入的了解:
- 快速开始 :通过阅读 快速开始,你可以快速开始使用 Flink sql client 去读写 Hudi
- 配置 :对于 Flink 配置,使用
$FLINK_HOME/conf/flink-conf.yaml
来配置。 对于任意一个作业的配置,通过表参数来设置 - 写功能 :Flink 支持多种写功能用例,例如 离线批量导入,全量接增量,Changelog 模式,Insert 模式 和 离线 Compaction
- 查询功能 :Flink 支持多种查询功能用例,例如 Hive 查询, Presto 查询
- 优化 :针对 Flink 读写 Hudi 的操作,本指南提供了一些优化建议,例如 内存优化 和 写入限流
快速开始
安装
我们推荐使用 Flink Sql Client 来读写 Hudi,因为 Flink sql client 对于 SQL 用户来说更容易上手。
步骤1 下载 Flink jar
我们推荐使用 Flink-1.12.x 来读写 Hudi。 你可以按照 Flink 安装文档 的指导来安装 Flink。 hudi-flink-bundle.jar
使用的是 scala 2.11,所以我们推荐 Flink-1.12.x 配合 scala 2.11 来使用。
步骤2 启动 Flink 集群
在 Hadoop 环境下启动 standalone 的 Flink 集群。 在你启动 Flink 集群前,我们推荐先配置如下参数:
- 在
$FLINK_HOME/conf/flink-conf.yaml
中添加配置:taskmanager.numberOfTaskSlots: 4
- 在
$FLINK_HOME/conf/flink-conf.yaml
中,根据数据量大小和集群大小来添加其他的 Flink 配置 - 在
$FLINK_HOME/conf/workers
中添加4核localhost
来保证我们本地集群中有4个 workers
启动集群:
# HADOOP_HOME 是 Hadoop 的根目录。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# 启动 Flink standalone 集群
./bin/start-cluster.sh
步骤3 启动 Flink SQL client
Hudi 将 Flink 板块单独打包为 hudi-flink-bundle.jar
,该 Jar 包需要在启动的时候加载。
你可以在 hudi-source-dir/packaging/hudi-flink-bundle
下手动的打包这个 Jar 包,或者从 Apache Official Repository
中下载。
启动 Flink SQL Client:
# HADOOP_HOME 是 Hadoop 的根目录。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded -j .../hudi-flink-bundle_2.1?-*.*.*.jar shell
小提示:
- 为了兼容大部分的对象存储,我们推荐使用 Hadoop 2.9 x+的 Hadoop 版本
- flink-parquet 和 flink-avro 格式已经打包在 hudi-flink-bundle.jar 中了
根据下面的不同功能来设置表名,存储路径和操作类型。 Flink SQL Client 是逐行执行 SQL 的。
插入数据
先创建一个 Flink Hudi 表,然后在通过下面的 VALUES
语句往该表中插入数据。
-- 为了更直观的显示结果,推荐把 CLI 的输出结果模式设置为 tableau。
set execution.result-mode=tableau;
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'schema://base-path',
'table.type' = 'MERGE_ON_READ' -- 创建一个 MERGE_ON_READ类型的表,默认是 COPY_ON_ERITE
);
-- 使用 values 语句插入数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
查询数据
-- 从 Hudi 表中查询
select * from t1;
该查询语句提供的是 快照读(Snapshot Querying)。 如果想了解更多关于表类型和查询类型的介绍,可以参考文档 表类型和查询类型
更新数据
数据的更新和插入数据类似:
-- 这条语句会更新 key 为 'id1' 的记录
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
需要注意的是:现在使用的存储类型为 Append
。通常我们都是使用 apennd 模式,除非你是第一次创建这个表。 再次 查询数据 就会显示更新后的结果。
每一次的插入操作都会在时间轴上生成一个带时间戳的新的 commit,在元数据字段 _hoodie_commit_time
和同一 _hoodie_record_key
的 age
字段中查看更新。
流式查询
Hudi Flink 也有能力来查询从指定时间戳开始的流式记录集合。 该功能可以使用Hudi的流式查询,只需要提供一个查询开始的时间戳就可以完成查询。如果我们需要的 是指定时间戳后的所有数据,我们就不需要指定结束时间。
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'oss://vvr-daily/hudi/t1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- 该参数开启流式查询
'read.streaming.start-commit' = '20210316134557' -- 指定开始的时间戳
'read.streaming.check-interval' = '4' -- 指定检查新的commit的周期,默认是60秒
);
-- 开启流式的查询
select * from t1;
上述的查询会查询出 read.streaming.start-commit
时间戳后的所有数据。该功能的特殊在于可以同时在流和批的 pipeline 上执行。
删除数据
在 流式查询 中使用数据时,Hudi Flink 源还可以接受来自底层数据源的更改日志,然后可以按行级别应用更新和删除。所以,你可以在 Hudi 上同步各种 RDBMS 的近实时快照。
Flink 配置
在使用 Flink 前,你需要在 $FLINK_HOME/conf/flink-conf.yaml
中设置一些全局的 Flink 配置。
并行度
名称 | 默认值 | 类型 | 描述 |
---|---|---|---|
taskmanager.numberOfTaskSlots | 1 | Integer | 单个 TaskManager 可以运行的并行 task 数。我们建议将该值设置为 > 4,实际值需根据数据量进行设置 |
parallelism.default | 1 | Integer | 当用户为指定算子并行度时,会使用这个并行度(默认值是1)。例如 write.bucket_assign.tasks 没有设置,就会使用这个默认值 |
内存
名称 | 默认值 | 类型 | 描述 |
---|---|---|---|
jobmanager.memory.process.size | (none) | MemorySize | JobManager 的总进程内存大小。代表 JobManager JVM 进程消耗的所有内存,包括总的 Flink 内存、JVM 元空间和 JVM 开销 |
taskmanager.memory.task.heap.size | (none) | MemorySize | TaskExecutor 的堆内存大小。这是为写缓存保留的 JVM 堆内存大小 |
taskmanager.memory.managed.size | (none) | MemorySize | 这是内存管理器管理的堆外内存的大小,用于排序和 RocksDB 状态后端。如果选择 RocksDB 作为状态后端,则需要设置此内存 |
Checkpoint
名称 | 默认值 | 类型 | 描述 |
---|---|---|---|
execution.checkpointing.interval | (none) | Duration | 设置该值的方式为 execution.checkpointing.interval = 150000ms ,其中150000ms = 2.5min。 设置这个参数等同于开启了 Checkpoint |
state.backend | (none) | String | 保存状态信息的状态后端。 我们推荐设置状态后端为 rocksdb :state.backend: rocksdb |
state.backend.rocksdb.localdir | (none) | String | RocksDB 存储状态信息的路径 |
state.checkpoints.dir | (none) | String | Checkpoint 的默认路径,用于在支持 Flink 的文件系统中存储检查点的数据文件和元数据。存储路径必须可从所有参与进程/节点(即所有 TaskManager 和 JobManager)访问,如 HDFS 和 OSS 路径 |
state.backend.incremental | false | Boolean | 选项状态后端是否创建增量检查点。对于增量检查点,只存储与前一个检查点的差异,而不是完整的检查点状态。如果存储状态设置为 rocksdb ,建议打开这个选项 |
表参数
对于单个作业的配置,我们可以在 Flink SQL 语句的 WITH
中设置。
所以,作业级别的配置如下:
内存
我们在内存调 优的时候需要先关注 TaskManager 的数量和 内存 配置,以及 write task 的并发(write.tasks: 4
的值),确认每个 write task 能够分配到足够的内存,
再考虑以下相关的内存参数设置。
名称 | 说明 | 默认值 | 备注 |
---|---|---|---|
write.task.max.size | 一个 write task 的最大可用内存。 默认为 1024MB | 1024D | 当前预留给 write buffer 的内存为 write.task.max.size - compaction.max_memory 。 当 write task 的内存 buffer 达到该阈值后会将内存里最大的 buffer flush |
write.batch.size | Flink 的 write task 为了提高写数据效率,会按照写 bucket 提前缓存数据,每个 bucket 的数据在内存达到阈值之前会一直缓存在内存中,当阈值达到会把数据 buffer 传递给 Hudi 的 writer 执行写操作。默认为 64MB | 64D | 推荐使用默认值 |
write.log_block.size | Hudi 的 log writer 在收到 write task 的数据后不会马上 flush 数据,writer 是以 LogBlock 为单位往磁盘刷数据的,在 LogBlock 攒够之前 records 会以序列化字节的形式缓存在 writer 内部。默认为 128MB | 128 | 推荐使用默认值 |
write.merge.max_memory | 当表的类型为 COPY_ON_WRITE ,Hudi 会合并增量数据和 base file 中的数据。增量的数据会缓存在内存的 map 结构里,这个 map 是可溢写的,这个参数控制了 map 可以使用的堆内存大小。 默认为 100MB | 100 | 推荐使用默认值 |
compaction.max_memory | 同 write.merge.max_memory 类似,只是发生在压缩时。默认为 100MB | 100 | 如果是在线 compaction,资源充足时可以开大些,比如 1024MB |
并行度
名称 | 说明 | 默认值 | 备注 |
---|---|---|---|
write.tasks | writer 的并发,每个 writer 顺序写 1 ~N 个 buckets。默认为 4 | 4 | 增加这个并发,对小文件个数没影响 |
write.bucket_assign.tasks | bucket assigner 的并发,默认使用 Flink 默认并发 parallelism.default | parallelism.default | 增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件(小 bucket)数 |
write.index_boostrap.tasks | Index bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 Checkpoint,因此需要设置多一些的 Checkpoint 失败容忍次数。 默认使用 Flink 默认并发 parallelism.default | parallelism.default | 只在 index.bootsrap.enabled 为 true 时生效 |
read.tasks | 读算子的并发(batch 和 stream。默认为 4 | 4 | |
compaction.tasks | 在线 compaction 的并发。默认为 10 | 10 | 在线 compaction 比较耗费资源,建议使用 离线 compaction |
Compaction
这些参数都只服务于在线 compaction。
通过设置compaction.async.enabled
= false
来关闭在线 compaction,但是 compaction.schedule.enable
仍然建议开启。之后通过离线 compaction
直接执行在线 compaction 产生的 compaction plan。
名称 | 说明 | 默认值 | 备注 |
---|---|---|---|
compaction.schedule.enabled | 是否阶段性生成 compaction plan | true | 建议开启,即使 compaction.async.enabled = false |
compaction.async.enabled | 是否开启异步压缩,MOR 表时默认开启 | true | 通过关闭此参数来关闭 在线 compaction |
compaction.trigger.strategy | 压缩策略 | num_commits | 可选择的策略有 num_commits :达到 N 个 delta commits 时触发 compaction; time_elapsed :距离上次 compaction 超过 N 秒触发 compaction ; num_and_time :NUM_COMMITS 和 TIME_ELAPSED 同时满足; num_or_time :NUM_COMMITS 或者 TIME_ELAPSED 中一个满足 |
compaction.delta_commits | 默认策略,5 个 delta commits 触发一次压缩 | 5 | -- |
compaction.delta_seconds | 默认 1 小时触发一次压缩 | 3600 | -- |
compaction.max_memory | compaction 的 hashMap 可用内存。 默认值为 100MB | 100 | 资源够用的话,建议调整到 1024MB |
compaction.target_io | 每个 compaction plan 的 IO 内存上限 (读和写)。 默认值为 5GB | 5120 | 离线 compaction 的默认值是 500GB |
内存优化
MOR
- 把 Flink 的状态后端设置为
rocksdb
(默认的in memory
状态后端非常的消耗内存) - 如果内存足够,
compaction.max_memory
可以设置得更大些(默认为100MB
,可以调大到1024MB
) - 关注 taskManager 分配给每个 write task 的内存,保证每个 write task 能够分配到
write.task.max.size
所配置的内存大小。 比如 taskManager 的内存是4GB
, 运行了2
个StreamWriteFunction
,那每个 write function 能分到2GB
,尽量预留一些缓存。因为网络缓存,taskManager 上其他类型的 task (比如BucketAssignFunction
)也会消耗一些内存 - 需要关注 compaction 的内存变化。
compaction.max_memory
控制了每个 compaction task 读 log 时可以利用的内存大小。compaction.tasks
控制了 compaction task 的并发
COW
- 把 Flink 的状态后端设置为
rocksdb
(默认的in memory
状态后端非常的消耗内存) - 同时调大
write.task.max.size
和write.merge.max_memory
(默认值分别是1024MB
和100MB
,可以调整为2014MB
和1024MB
) - 关注 taskManager 分配给每个 write task 的内存,保证每个 write task 能够分配到
write.task.max.size
所配置的内存大小。 比如 taskManager 的内存是4GB