Flink Guide
This page introduces Flink-Hudi integration. We can feel the unique charm of how Flink brings in the power of streaming into Hudi. This guide helps you quickly start using Flink on Hudi, and learn different modes for reading/writing Hudi by Flink:
- Quick Start : Read Quick Start to get started quickly Flink sql client to write to(read from) Hudi.
- Configuration : For Global Configuration, sets up through
$FLINK_HOME/conf/flink-conf.yaml
. For per job configuration, sets up through Table Option. - Writing Data : Flink supports different modes for writing, such as CDC Ingestion, Bulk Insert, Index Bootstrap, Changelog Mode and Append Mode.
- Querying Data : Flink supports different modes for reading, such as Streaming Query and Incremental Query.
- Tuning : For write/read tasks, this guide gives some tuning suggestions, such as Memory Optimization and Write Rate Limit.
- Optimization: Offline compaction is supported Offline Compaction.
- Query Engines: Besides Flink, many other engines are integrated: Hive Query, Presto Query.
Quick Start
Setup
- Flink SQL
- DataStream API
We use the Flink Sql Client because it's a good quick start tool for SQL users.
Step.1 download Flink jar
Hudi works with both Flink 1.13, Flink 1.14 and Flink 1.15. You can follow the instructions here for setting up Flink. Then choose the desired Hudi-Flink bundle jar to work with different Flink and Scala versions:
hudi-flink1.13-bundle
hudi-flink1.14-bundle
hudi-flink1.15-bundle
Step.2 start Flink cluster
Start a standalone Flink cluster within hadoop environment. Before you start up the cluster, we suggest to config the cluster as follows:
- in
$FLINK_HOME/conf/flink-conf.yaml
, add config optiontaskmanager.numberOfTaskSlots: 4
- in
$FLINK_HOME/conf/flink-conf.yaml
, add other global configurations according to the characteristics of your task - in
$FLINK_HOME/conf/workers
, add itemlocalhost
as 4 lines so that there are 4 workers on the local cluster
Now starts the cluster:
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# Start the Flink standalone cluster
./bin/start-cluster.sh
Step.3 start Flink SQL client
Hudi has a prepared bundle jar for Flink, which should be loaded in the Flink SQL Client when it starts up.
You can build the jar manually under path hudi-source-dir/packaging/hudi-flink-bundle
(see Build Flink Bundle Jar), or download it from the
Apache Official Repository.
Now starts the SQL CLI:
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded -j .../hudi-flink-bundle_2.1?-*.*.*.jar shell
Please note the following:
- We suggest hadoop 2.9.x+ version because some of the object storage has filesystem implementation only after that
- The flink-parquet and flink-avro formats are already packaged into the hudi-flink-bundle jar
Setup table name, base path and operate using SQL for this guide. The SQL CLI only executes the SQL line by line.
Hudi works with Flink 1.13, Flink 1.14 and Flink 1.15. Please add the desired dependency to your project:
<!-- Flink 1.13 -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.13-bundle</artifactId>
<version>0.12.2</version>
</dependency>
<!-- Flink 1.14 -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.14-bundle</artifactId>
<version>0.12.2</version>
</dependency>
<!-- Flink 1.15 -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.15-bundle</artifactId>
<version>0.12.2</version>
</dependency>
Insert Data
- Flink SQL
- DataStream API
Creates a Flink Hudi table first and insert data into the Hudi table using SQL VALUES
as below.
-- sets up the result mode to tableau to show the results directly in the CLI
set sql-client.execution.result-mode = tableau;
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${path}',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);
-- insert data using values
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
Creates a Flink Hudi table first and insert data into the Hudi table using DataStream API as below.
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String targetTable = "t1";
String basePath = "file:///tmp/t1";
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
DataStream<RowData> dataStream = env.addSource(...);
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
.column("uuid VARCHAR(20)")
.column("name VARCHAR(10)")
.column("age INT")
.column("ts TIMESTAMP(3)")
.column("`partition` VARCHAR(20)")
.pk("uuid")
.partition("partition")
.options(options);
builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
env.execute("Api_Sink");
Query Data
- Flink SQL
- DataStream API
-- query from the Hudi table
select * from t1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String targetTable = "t1";
String basePath = "file:///tmp/t1";
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option enable the streaming read
options.put(FlinkOptions.READ_START_COMMIT.key(), "'20210316134557'"); // specifies the start commit instant time
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
.column("uuid VARCHAR(20)")
.column("name VARCHAR(10)")
.column("age INT")
.column("ts TIMESTAMP(3)")
.column("`partition` VARCHAR(20)")
.pk("uuid")
.partition("partition")
.options(options);
DataStream<RowData> rowDataDataStream = builder.source(env);
rowDataDataStream.print();
env.execute("Api_Source");
This statement queries snapshot view of the dataset. Refers to Table types and queries for more info on all table types and query types supported.
Update Data
This is similar to inserting new data.
-- this would update the record with key 'id1'
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
Notice that the save mode is now Append
. In general, always use append mode unless you are trying to create the table for the first time.
Querying the data again will now show updated records. Each write operation generates a new commit
denoted by the timestamp. Look for changes in _hoodie_commit_time
, age
fields for the same _hoodie_record_key
s in previous commit.
Streaming Query
Hudi Flink also provides capability to obtain a stream of records that changed since given commit timestamp. This can be achieved using Hudi's streaming querying and providing a start time from which changes need to be streamed. We do not need to specify endTime, if we want all changes after the given commit (as is the common case).
The bundle jar with hive profile is needed for streaming query, by default the officially released flink bundle is built without hive profile, the jar needs to be built manually, see Build Flink Bundle Jar for more details.
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${path}',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- this option enable the streaming read
'read.start-commit' = '20210316134557', -- specifies the start commit instant time
'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);
-- Then query the table in stream mode
select * from t1;
This will give all changes that happened after the read.start-commit
commit. The unique thing about this
feature is that it now lets you author streaming pipelines on streaming or batch data source.
Delete Data
When consuming data in streaming query, Hudi Flink source can also accepts the change logs from the underneath data source, it can then applies the UPDATE and DELETE by per-row level. You can then sync a NEAR-REAL-TIME snapshot on Hudi for all kinds of RDBMS.
Where To Go From Here?
Check out the Flink Setup how-to page for deeper dive into configuration settings.
If you are relatively new to Apache Hudi, it is important to be familiar with a few core concepts:
- Hudi Timeline – How Hudi manages transactions and other table services
- Hudi File Layout - How the files are laid out on storage
- Hudi Table Types –
COPY_ON_WRITE
andMERGE_ON_READ
- Hudi Query Types – Snapshot Queries, Incremental Queries, Read-Optimized Queries
See more in the "Concepts" section of the docs.
Take a look at recent blog posts that go in depth on certain topics or use cases.
Hudi tables can be queried from query engines like Hive, Spark, Flink, Presto and much more. We have put together a demo video that show cases all of this on a docker based setup with all dependent systems running locally. We recommend you replicate the same setup and run the demo yourself, by following steps here to get a taste for it. Also, if you are looking for ways to migrate your existing data to Hudi, refer to migration guide.