Flink 指南
本指南提供了使用 Flink SQL 操作 Hudi 的文档。阅读本指南,您可以学习如何快速开始使用 Flink 读写 Hudi,同时对配置和任务优化有更深入的了解:
- 快速开始 :通过阅读 快速开始,你可以快速开始使用 Flink sql client 去读写 Hudi
- 配置 :对于 Flink 配置,使用
$FLINK_HOME/conf/flink-conf.yaml
来配置。 对于任意一个作业的配置,通过表参数来设置 - 写功能 :Flink 支持多种写功能用例,例如 离线批量导入,全量接增量,Changelog 模式,Insert 模式 和 离线 Compaction
- 查询功能 :Flink 支持多种查询功能用例,例如 Hive 查询, Presto 查询
- 优化 :针对 Flink 读写 Hudi 的操作,本指南提供了一些优化建议,例如 内存优化 和 写入限流
快速开始
安装
我们推荐使用 Flink Sql Client 来读写 Hudi,因为 Flink sql client 对于 SQL 用户来说更容易上手。
步骤1 下载 Flink jar
我们推荐使用 Flink-1.12.x 来读写 Hudi。 你可以按照 Flink 安装文档 的指导来安装 Flink。 hudi-flink-bundle.jar
使用的是 scala 2.11,所以我们推荐 Flink-1.12.x 配合 scala 2.11 来使用。
步骤2 启动 Flink 集群
在 Hadoop 环境下启动 standalone 的 Flink 集群。 在你启动 Flink 集群前,我们推荐先配置如下参数:
- 在
$FLINK_HOME/conf/flink-conf.yaml
中添加配置:taskmanager.numberOfTaskSlots: 4
- 在
$FLINK_HOME/conf/flink-conf.yaml
中,根据数据量大小和集群大小来添加其他的 Flink 配置 - 在
$FLINK_HOME/conf/workers
中添加4核localhost
来保证我们本地集群中有4个 workers
启动集群:
# HADOOP_HOME 是 Hadoop 的根目录。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# 启动 Flink standalone 集群
./bin/start-cluster.sh
步骤3 启动 Flink SQL client
Hudi 将 Flink 板块单独打包为 hudi-flink-bundle.jar
,该 Jar 包需要在启动的时候加载。
你可以在 hudi-source-dir/packaging/hudi-flink-bundle
下手动的打包这个 Jar 包,或者从 Apache Official Repository
中下载。
启动 Flink SQL Client:
# HADOOP_HOME 是 Hadoop 的根目录。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded -j .../hudi-flink-bundle_2.1?-*.*.*.jar shell
小提示:
- 为了兼容大部分的对象存储,我们推荐使用 Hadoop 2.9 x+的 Hadoop 版本
- flink-parquet 和 flink-avro 格式已经打包在 hudi-flink-bundle.jar 中了
根据下面的不同功能来设置表名,存储路径和操作类型。 Flink SQL Client 是逐行执行 SQL 的。
插入数据
先创建一个 Flink Hudi 表,然后在通过下面的 VALUES
语句往该表中插入数据。
-- 为了更直观的显示结果,推荐把 CLI 的输出结果模式设置为 tableau。
set execution.result-mode=tableau;
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'schema://base-path',
'table.type' = 'MERGE_ON_READ' -- 创建一个 MERGE_ON_READ类型的表,默认是 COPY_ON_ERITE
);
-- 使用 values 语句插入数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
查询数据
-- 从 Hudi 表中查询
select * from t1;
该查询语句提供的是 快照读(Snapshot Querying)。 如果想了解更多关于表类型和查询类型的介绍,可以参考文档 表类型和查询类型