Flink 指南
本指南提供了使用 Flink SQL 操作 Hudi 的文档。阅读本指南,您可以学习如何快速开始使用 Flink 读写 Hudi,同时对配置和任务优化有更深入的了解:
- 快速开始 :通过阅读 快速开始,你可以快速开始使用 Flink sql client 去读写 Hudi
- 配置 :对于 Flink 配置,使用
$FLINK_HOME/conf/flink-conf.yaml
来配置。 对于任意一个作业的配置,通过表参数来设置 - 写功能 :Flink 支持多种写功能用例,例如 离线批量导入,全量接增量,Changelog 模式,Insert 模式 和 离线 Compaction
- 查询功能 :Flink 支持多种查询功能用例,例如 Hive 查询, Presto 查询
- 优化 :针对 Flink 读写 Hudi 的操作,本指南提供了一些优化建议,例如 内存优化 和 写入限流
快速开始
安装
我们推荐使用 Flink Sql Client 来读写 Hudi,因为 Flink sql client 对于 SQL 用户来说更容易上手。
步骤1 下载 Flink jar
我们推荐使用 Flink-1.12.x 来读写 Hudi。 你可以按照 Flink 安装文档 的指导来安装 Flink。 hudi-flink-bundle.jar
使用的是 scala 2.11,所以我们推荐 Flink-1.12.x 配合 scala 2.11 来使用。
步骤2 启动 Flink 集群
在 Hadoop 环境下启动 standalone 的 Flink 集群。 在你启动 Flink 集群前,我们推荐先配置如下参数:
- 在
$FLINK_HOME/conf/flink-conf.yaml
中添加配置:taskmanager.numberOfTaskSlots: 4
- 在
$FLINK_HOME/conf/flink-conf.yaml
中,根据数据量大小和集群大小来添加其他的 Flink 配置 - 在
$FLINK_HOME/conf/workers
中添加4核localhost
来保证我们本地集群中有4个 workers
启动集群:
# HADOOP_HOME 是 Hadoop 的根目录。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# 启动 Flink standalone 集群
./bin/start-cluster.sh
步骤3 启动 Flink SQL client
Hudi 将 Flink 板块单独打包为 hudi-flink-bundle.jar
,该 Jar 包需要在启动的时候加载。
你可以在 hudi-source-dir/packaging/hudi-flink-bundle
下手动的打包这个 Jar 包,或者从 Apache Official Repository
中下载。
启动 Flink SQL Client:
# HADOOP_HOME 是 Hadoop 的根目录。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded -j .../hudi-flink-bundle_2.1?-*.*.*.jar shell
小提示:
- 为了兼容大部分的对象存储,我们推荐使用 Hadoop 2.9 x+的 Hadoop 版本
- flink-parquet 和 flink-avro 格式已经打包在 hudi-flink-bundle.jar 中了
根据下面的不同功能来设置表名,存储路径和操作类型。 Flink SQL Client 是逐行执行 SQL 的。
插入数据
先创建一个 Flink Hudi 表,然后在通过下面的 VALUES
语句往该表中插入数据。
-- 为了更直观的显示结果,推荐把 CLI 的输出结果模式设置为 tableau。
set execution.result-mode=tableau;
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'schema://base-path',
'table.type' = 'MERGE_ON_READ' -- 创建一个 MERGE_ON_READ类型的表,默认是 COPY_ON_ERITE
);
-- 使用 values 语句插入数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
查询数据
-- 从 Hudi 表中查询
select * from t1;
该查询语句提供的是 快照读(Snapshot Querying)。 如果想了解更多关于表类型和查询类型的介绍,可以参考文档 表类型和查询类型
更新数据
数据的更新和插入数据类似:
-- 这条语句会更新 key 为 'id1' 的记录
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
需要注意的是:现在使用的存储类型为 Append
。通常我们都是使用 apennd 模式,除非你是第一次创建这个表。 再次 查询数据 就会显示更新后的结果。
每一次的插入操作都会在时间轴上生成一个带时间戳的新的 commit,在元数据字段 _hoodie_commit_time
和同一 _hoodie_record_key
的 age
字段中查看更新。
流式查询
Hudi Flink 也有能力来查询从指定时间戳开始的流式记录集合。 该功能可以使用Hudi的流式查询,只需要提供一个查询开始的时间戳就可以完成查询。如果我们需要的 是指定时间戳后的所有数据,我们就不需要指定结束时间。
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'oss://vvr-daily/hudi/t1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- 该参数开启流式查询
'read.streaming.start-commit' = '20210316134557' -- 指定开始的时间戳
'read.streaming.check-interval' = '4' -- 指定检查新的commit的周期,默认是60秒
);
-- 开启流式的查询
select * from t1;
上述的查询会查询出 read.streaming.start-commit
时间戳后的所有数据。该功能的特殊在于可以同时在流和批的 pipeline 上执行。
删除数据
在 流式查询 中使用数据时,Hudi Flink 源还可以接受来自底层数据源的更改日志,然后可以按行级别应用更新和删除。所以,你可以在 Hudi 上同步各种 RDBMS 的近实时快照。
Flink 配置
在使用 Flink 前,你需要在 $FLINK_HOME/conf/flink-conf.yaml
中设置一些全局的 Flink 配置。
并行度
名称 | 默认值 | 类型 | 描述 |
---|---|---|---|
taskmanager.numberOfTaskSlots | 1 | Integer | 单个 TaskManager 可以运行的并行 task 数。我们建议将该值设置为 > 4,实际值需根据数据量进行设置 |
parallelism.default | 1 | Integer | 当用户为指定算子并行度时,会使用这个并行度(默认值是1 )。例如 write.bucket_assign.tasks 没有设置,就会使用这个默认值 |
内存
名称 | 默认值 | 类型 | 描述 |
---|---|---|---|
jobmanager.memory.process.size | (none) | MemorySize | JobManager 的总进程内存大小。代表 JobManager JVM 进程消耗的所有内存,包括总的 Flink 内存、JVM 元空间和 JVM 开销 |
taskmanager.memory.task.heap.size | (none) | MemorySize | TaskExecutor 的堆内存大小。这是为写缓存保留的 JVM 堆内存大小 |
taskmanager.memory.managed.size | (none) | MemorySize | 这是内存管理器管理的堆外内存的大小,用于排序和 RocksDB 状态后端。如果选择 RocksDB 作为状态后端,则需要设置此内存 |
Checkpoint
名称 | 默认值 | 类型 | 描述 |
---|---|---|---|
execution.checkpointing.interval | (none) | Duration | 设置该值的方式为 execution.checkpointing.interval = 150000ms ,其中150000ms = 2.5min。 设置这个参数等同于开启了 Checkpoint |
state.backend | (none) | String | 保存状态信息的状态后端。 我们推荐设置状态后端为 rocksdb :state.backend: rocksdb |
state.backend.rocksdb.localdir | (none) | String | RocksDB 存储状态信息的路径 |
state.checkpoints.dir | (none) | String | Checkpoint 的默认路径,用于在支持 Flink 的文件系统中存储检查点的数据文件和元数据。存储路径必须可从所有参与进程/节点(即所有 TaskManager 和 JobManager)访问,如 HDFS 和 OSS 路径 |
state.backend.incremental | false | Boolean | 选项状态后端是否创建增量检查点。对于增量检查点,只存储与前一个检查点的差异,而不是完整的检查点状态。如果存储状态设置为 rocksdb ,建议打开这个选项 |
表参数
对于单个作业的配置,我们可以在 Flink SQL 语句的 WITH
中设置。
所以,作业级别的配置如下:
内存
我们在内存调优的时候需要先关注 TaskManager 的数量和 内存 配置,以及 write task 的并发(write.tasks: 4
的值),确认每个 write task 能够分配到足够的内存,
再考虑以下相关的内存参数设置。
名称 | 说明 | 默认值 | 备注 |
---|---|---|---|
write.task.max.size | 一个 write task 的最大可用内存。 默认为 1024MB | 1024D | 当前预留给 write buffer 的内存为 write.task.max.size - compaction.max_memory 。 当 write task 的内存 buffer 达到该阈值后会将内存里最大的 buffer flush |
write.batch.size | Flink 的 write task 为了提高写数据效率,会按照写 bucket 提前缓存数据,每个 bucket 的数据在内存达到阈值之前会一直缓存在内存中,当阈值达到会把数据 buffer 传递给 Hudi 的 writer 执行写操作。默认为 64MB | 64D | 推荐使用默认值 |
write.log_block.size | Hudi 的 log writer 在收到 write task 的数据后不会马上 flush 数据,writer 是以 LogBlock 为单位往磁盘刷数据的,在 LogBlock 攒够之前 records 会以序列化字节的形式缓存在 writer 内部。默认为 128MB | 128 | 推荐使用默认值 |
write.merge.max_memory | 当表的类型为 COPY_ON_WRITE ,Hudi 会合并增量数据和 base file 中的数据。增量的数据会缓存在内存的 map 结构里,这个 map 是可溢写的,这个参数控制了 map 可以使用的堆内存大小。 默认为 100MB | 100 | 推荐使用默认值 |
compaction.max_memory | 同 write.merge.max_memory 类似,只是发生在压缩时。默认为 100MB | 100 | 如果是在线 compaction,资源充足时可以开大些,比如 1024MB |
并行度
名称 | 说明 | 默认值 | 备注 |
---|---|---|---|
write.tasks | writer 的并发,每个 writer 顺序写 1 ~N 个 buckets。默认为 4 | 4 | 增加这个并发,对小文件个数没影响 |
write.bucket_assign.tasks | bucket assigner 的并发,默认使用 Flink 默认并发 parallelism.default | parallelism.default | 增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件(小 bucket)数 |
write.index_boostrap.tasks | Index bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 Checkpoint,因此需要设置多一些的 Checkpoint 失败容忍次数。 默认使用 Flink 默认并发 parallelism.default | parallelism.default | 只在 index.bootsrap.enabled 为 true 时生效 |
read.tasks | 读算子的并发(batch 和 stream。默认为 4 | 4 | |
compaction.tasks | 在线 compaction 的并发。默认为 10 | 10 | 在 线 compaction 比较耗费资源,建议使用 离线 compaction |
Compaction
这些参数都只服务于在线 compaction。
通过设置compaction.async.enabled
= false
来关闭在线 compaction,但是 compaction.schedule.enable
仍然建议开启。之后通过离线 compaction
直接执行在线 compaction 产生的 compaction plan。
名称 | 说明 | 默认值 | 备注 |
---|---|---|---|
compaction.schedule.enabled | 是否阶段性生成 compaction plan | true | 建议开启,即使 compaction.async.enabled = false |
compaction.async.enabled | 是否开启异步压缩,MOR 表时默认开启 | true | 通过关闭此参数来关闭 在线 compaction |
compaction.trigger.strategy | 压缩策略 | num_commits | 可选择的策略有 num_commits :达到 N 个 delta commits 时触发 compaction; time_elapsed :距离上次 compaction 超过 N 秒触发 compaction ; num_and_time :NUM_COMMITS 和 TIME_ELAPSED 同时满足; num_or_time :NUM_COMMITS 或者 TIME_ELAPSED 中一个满足 |
compaction.delta_commits | 默认策略,5 个 delta commits 触发一次压缩 | 5 | -- |
compaction.delta_seconds | 默认 1 小时触发一次压缩 | 3600 | -- |
compaction.max_memory | compaction 的 hashMap 可用内存。 默认值为 100MB | 100 | 资源够用的话,建议调整到 1024MB |
compaction.target_io | 每个 compaction plan 的 IO 内存上限 (读和写)。 默认值为 5GB | 5120 | 离 线 compaction 的默认值是 500GB |
内存优化
MOR
- 把 Flink 的状态后端设置为
rocksdb
(默认的in memory
状态后端非常的消耗内存) - 如果内存足够,
compaction.max_memory
可以设置得更大些(默认为100MB
,可以调大到1024MB
) - 关注 taskManager 分配给每个 write task 的内存,保证每个 write task 能够分配到
write.task.max.size
所配置的内存大小。 比如 taskManager 的内存是4GB
, 运行了2
个StreamWriteFunction
,那每个 write function 能分到2GB
,尽量预留一些缓存。因为网络缓存,taskManager 上其他类型的 task (比如BucketAssignFunction
)也会消耗一些内存 - 需要关注 compaction 的内存变化。
compaction.max_memory
控制了每个 compaction task 读 log 时可以利用的内存大小。compaction.tasks
控制了 compaction task 的并发
COW
- 把 Flink 的状态后端设置为
rocksdb
(默认的in memory
状态后端非常的消耗内存) - 同时调大
write.task.max.size
和write.merge.max_memory
(默认值分别是1024MB
和100MB
,可以调整为2014MB
和1024MB
) - 关注 taskManager 分配给每个 write task 的内存,保证每个 write task 能够分配到
write.task.max.size
所配置的内存大小。 比如 taskManager 的内存是4GB
, 运行了2
个StreamWriteFunction
,那每个 write function 能分到2GB
,尽量预留一些缓存。因为网络缓存,taskManager 上其他类型的 task (比如BucketAssignFunction
)也会消耗一些内存
离线批量导入
针对存量数据导入的需求,如果存量数据来源于其他数据源,可以使用离线批量导入功能(bulk_insert
),快速将存量数据导入 Hudi。
bulk_insert
省去了 avro 的序列化以及数据的 merge 过程,后续也不会再有去重操作。所以,数据的唯一性需要自己来保证。
bulk_insert
在 batch execution mode
模式下执行更加高效。 batch execution mode
模式默认会按照 partition path 排序输入消息再写入 Hudi,
避免 file handle 频繁切换导致性能下降。
bulk_insert
的 write tasks 的并发是通过参数 write.tasks
来指定,并发的数量会影响到小文件的数量,理论上,bulk_insert
的 write tasks 的并发数就是划分的 bucket 数,
当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会 回滚到新的文件句柄,所以最后:写文件数量 >= write.bucket_assign.tasks
。
参数
名称 | Required | 默认值 | 备注 |
---|---|---|---|
write.operation | true | upsert | 开启 bulk_insert 功能 |
write.tasks | false | 4 | bulk_insert write tasks 的并发,最后的文件数 >= write.bucket_assign.tasks |
write.bulk_insert.shuffle_by_partition | false | true | 是否将数据按照 partition 字段 shuffle 后,再通过 write task 写入,开启该参数将减少小文件的数量,但是有数据倾斜的风险 |
write.bulk_insert.sort_by_partition | false | true | 是否将数据线按照 partition 字段排序后,再通过 write task 写入,当一个 write task 写多个 partition时,开启可以减少小文件数量 |
write.sort.memory | false | 128 | sort 算子的可用 managed memory(单位 MB)。默认为 128 MB |
全量接增量
针对全量数据导入后,接增量的需求。如果已经有全量的离线 Hudi 表,需要接上实时写入,并且保证数据不重复,可以开启 全量接增量(index bootstrap
)功能。
如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启 写入限流 )。
参数
名称 | Required | 默认值 | 备注 |
---|---|---|---|
index.bootstrap.enabled | true | false | 开启 index bootstrap 索引加载后,会将已存在的 Hudi 表的数据一次性加载到 state 中 |
index.partition.regex | false | * | 设置正则表达式进行分区筛选,默认为加载全部分区 |
使用流程
CREATE TABLE
创建和 Hudi 表对应的语句 ,注意table.type
必须正确- 设置
index.bootstrap.enabled
=true
开启索引加载功能 - 在
flink-conf.yaml
中设置 Checkpoint 失败容忍 :execution.checkpointing.tolerable-failed-checkpoints = n
(取决于checkpoint 调度次数) - 等待第一次 Checkpoint 完成,表示索引加载完成
- 索引加载完成后可以退出并保存 savepoint(也可以直接用 externalized checkpoint)
- 重启任务,将
index.bootstrap.enable
设置为false
,参数配置到合适的大小
- 索引加载是阻塞式,所以在索引加载过程中 Checkpoint 无法完成
- 索引加载由数据流触发,需要确保每个 partition 都至少有1条数据,即上游 source 有数据进来
- 索引加载为并发加载,根据数据量大小加载时间不同,可以在log中搜索
finish loading the index under partition
和Load record form file
日志内容来观察索引加载的进度 - 第一次 Checkpoint 成功就表示索引已经加载完成,后续从 Checkpoint 恢复时无需再次加载索引
Changelog 模式
针对使用 Hudi 保留消息的所有变更(I / -U / U / D),之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算)的需求,Hudi 的 MOR 表 通过行存原生支持保留消息的所有变更(format 层面的集成),通过流读 MOR 表可以消费到所有的变更记录。
参数
名称 | Required | 默认值 | 备注 |
---|---|---|---|
changelog.enabled | false | false | 默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被 merge 掉;改成 true 支持消费所有变更 |
批(快照)读仍然会合并所有的中间结果,不管 format 是否已存储中间状态。
设置 changelog.enable
为 true
后,中间的变更也只是 best effort
:异步的压缩任务会将中间变更合并成 1
条,所以如果流读消费不够及时,被压缩后
只能读到最后一条记录。当然,通过调整压缩的缓存时间可以预留一定的时间缓冲给 reader,比如调整压缩的两个参数:compaction.delta_commits
and compaction.delta_seconds
。
Insert 模式
当前 Hudi 对于 Insert 模式
默认会采用小文件策略:MOR 会追加写 avro log 文件,COW 会不断合并之前的 parquet 文件(并且增量的数据会去重),这样会导致性能下降。
如果想关闭文件合并,可以设置 write.insert.deduplicate
为 false
。 关闭后,不会有任何的去重行为,每次 flush 都是直接写独立的 parquet(MOR 表也会直接写 parquet)。
参数
名称 | Required | 默认值 | 备注 |
---|---|---|---|
write.insert.deduplicate | false | true | 默认 Insert 模式 会去重,关闭后每次 flush 都会直接写独立的 parquet |
Hive 查询
打包
第一步是打包 hudi-flink-bundle_2.11-0.9.0.jar
。 hudi-flink-bundle
module pom.xml 默认将 Hive 相关的依赖 scope 设置为 provided,
如果想打入 Hive 的依赖,需要显示指定 Profile 为 flink-bundle-shade-hive
。执行以下命令打入 Hive 依赖:
# Maven 打包命令
mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2
# 如果是 hive3 需要使用 profile -Pflink-bundle-shade-hive3
# 如果是 hive1 需要使用 profile -Pflink-bundle-shade-hive1
Hive1.x 现在只能实现同步 metadata 到 Hive,而无法使用 Hive 查询,如需查询可使用 Spark 查询 Hive 外表的方法查询。
使用 -Pflink-bundle-shade-hive x,需要修改 Profile 中 Hive 的版本为集群对应版本(只需修改 Profile 里的 Hive 版本)。修改位置为 packaging/hudi-flink-bundle/pom.xml
最下面的对应 Profile 段,找到后修改 Profile 中的 Hive 版本为对应版本即可。
Hive 环境准备
-
第一步是将
hudi-hadoop-mr-bundle.jar
放到 Hive中。 在 Hive 的根目录下创建auxlib/
文件夹,把hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar
移入到auxlib
。hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar
可以在packaging/hudi-hadoop-mr-bundle/target
目录下拷贝。 -
第二步是开启 Hive 相关的服务。Flink SQL Client 远程连接 Hive 的时候,要求 Hive 的
hive metastore
andhiveserver2
两个服务都开启且需要记住端口号。 服务开启的方法如下:
# 启动 Hive metastore 和 hiveserver2
nohup ./bin/hive --service metastore &
nohup ./bin/hive --service hiveserver2 &
# 每次更新了 /auxlib 下的 jar 包都需要重启上述两个服务
Hive 配置模版
Flink hive sync 现在支持两种 hive sync mode,分别是 hms
和 jdbc
。其中 hms
模式只需要配置 Metastore uris;而 jdbc
模式需要同时
配置 JDBC 属性 和 Metastore uris,具体配置模版如下:
-- hms mode 配置模版
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'oss://vvr-daily/hudi/t1',
'table.type' = 'COPY_ON_WRITE', --MERGE_ON_READ 方式在没生成 parquet 文件前,Hive 不会有输出
'hive_sync.enable' = 'true', -- Required。开启 Hive 同步功能
'hive_sync.mode' = 'hms' -- Required。将 hive sync mode 设置为 hms, 默认 jdbc
'hive_sync.metastore.uris' = 'thrift://ip:9083' -- Required。metastore 的端口
);
-- jdbc mode 配置模版
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'oss://vvr-daily/hudi/t1',
'table.type' = 'COPY_ON_WRITE', --MERGE_ON_READ 方式在没生成 parquet 文件前,Hive 不会有输出
'hive_sync.enable' = 'true', -- Required。开启 Hive 同步功能
'hive_sync.mode' = 'hms' -- Required。将 hive sync mode 设置为 hms, 默认 jdbc
'hive_sync.metastore.uris' = 'thrift://ip:9083' -- Required。metastore 的端口
'hive_sync.jdbc_url'='jdbc:hive2://ip:10000', -- required。hiveServer 的端口
'hive_sync.table'='t1', -- required。hive 新建的表名
'hive_sync.db'='testDB', -- required。hive 新建的数据库名
'hive_sync.username'='root', -- required。HMS 用户名
'hive_sync.password'='your password' -- required。HMS 密码
);
Hive 查询
使用 beeline 查询时,需要手动设置:
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
与其他包冲突
当 Flink lib/ 下有 flink-sql-connector-hive-xxx.jar
时,会出现hive包冲突,解决方法是在 Install 时,另外再指定一个 Profile:-Pinclude-flink-sql-connector-hive
,
同时删除 Flink lib/ 下的 flink-sql-connector-hive-xxx.jar
,新的 Install 的命令如下:
# Maven 打包命令
mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2 -Pinclude-flink-sql-connector-hive