Skip to main content
Version: 0.14.1

SQL DML

Spark SQL

SparkSQL provides several Data Manipulation Language (DML) actions for interacting with Hudi tables. These operations allow you to insert, update, merge and delete data from your Hudi tables. Let's explore them one by one.

Please refer to SQL DDL for creating Hudi tables using SQL.

Insert Into

You can use the INSERT INTO statement to add data to a Hudi table using Spark SQL. Here are some examples:

INSERT INTO <table> 
SELECT <columns> FROM <source>;
Deprecations

From 0.14.0, hoodie.sql.bulk.insert.enable and hoodie.sql.insert.mode are deprecated. Users are expected to use hoodie.spark.sql.insert.into.operation instead. To manage duplicates with INSERT INTO, please check out insert dup policy config.

Examples:

-- Insert into a copy-on-write (COW) Hudi table
INSERT INTO hudi_cow_nonpcf_tbl SELECT 1, 'a1', 20;

-- Insert into a merge-on-read (MOR) Hudi table
INSERT INTO hudi_mor_tbl SELECT 1, 'a1', 20, 1000;

-- Insert into a COW Hudi table with static partition
INSERT INTO hudi_cow_pt_tbl PARTITION(dt = '2021-12-09', hh='11') SELECT 2, 'a2', 1000;

-- Insert into a COW Hudi table with dynamic partition
INSERT INTO hudi_cow_pt_tbl PARTITION(dt, hh) SELECT 1 AS id, 'a1' AS name, 1000 AS ts, '2021-12-09' AS dt, '10' AS hh;
Mapping to write operations

Hudi offers flexibility in choosing the underlying write operation of a INSERT INTO statement using the hoodie.spark.sql.insert.into.operation configuration. Possible options include "bulk_insert" (large inserts), "insert" (with small file management), and "upsert" (with deduplication/merging). If a precombine field is not set, "insert" is chosen as the default. For a table with preCombine field set, "upsert" is chosen as the default operation.

Insert Overwrite

The INSERT OVERWRITE statement is used to replace existing data in a Hudi table.

INSERT OVERWRITE <table> 
SELECT <columns> FROM <source>;

All existing partitions that are affected by the INSERT OVERWRITE statement will replaced with the source data. Here are some examples:

-- Overwrite non-partitioned table
INSERT OVERWRITE hudi_mor_tbl SELECT 99, 'a99', 20.0, 900;
INSERT OVERWRITE hudi_cow_nonpcf_tbl SELECT 99, 'a99', 20.0;

-- Overwrite partitioned table with dynamic partition
INSERT OVERWRITE TABLE hudi_cow_pt_tbl SELECT 10, 'a10', 1100, '2021-12-09', '10';

-- Overwrite partitioned table with static partition
INSERT OVERWRITE hudi_cow_pt_tbl PARTITION(dt = '2021-12-09', hh='12') SELECT 13, 'a13', 1100;

Update

You can use the UPDATE statement to modify existing data in a Hudi table directly.

UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]

Here's an example:

-- Update data in a Hudi table
UPDATE hudi_mor_tbl SET price = price * 2, ts = 1111 WHERE id = 1;

-- Update data in a partitioned Hudi table
UPDATE hudi_cow_pt_tbl SET name = 'a1_1', ts = 1001 WHERE id = 1;

-- update using non-PK field
update hudi_cow_pt_tbl set ts = 1001 where name = 'a1';
info

The UPDATE operation requires the specification of a preCombineField.

Merge Into

The MERGE INTO statement allows you to perform more complex updates and merges against source data. The MERGE INTO statement is similar to the UPDATE statement, but it allows you to specify different actions for matched and unmatched records.

MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]

<merge_condition> =A equal bool condition
<matched_action> =
DELETE |
UPDATE SET * |
UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action> =
INSERT * |
INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])

Examples below

-- source table using hudi for testing merging into non-partitioned table
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);

merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;

-- source table using parquet for testing merging into partitioned table
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');

MERGE into hudi_cow_pt_tbl as target
using (
select id, name, '1000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;

Key requirements

For a Hudi table with user configured primary keys, the join condition in Merge Into is expected to contain the primary keys of the table. For a Table where Hudi auto generates primary keys, the join condition in MIT can be on any arbitrary data columns.

Delete From

You can remove data from a Hudi table using the DELETE FROM statement.

DELETE FROM tableIdentifier [ WHERE boolExpression ]

Examples below

-- Delete data from a Hudi table
DELETE FROM hudi_cow_nonpcf_tbl WHERE uuid = 1;

-- Delete data from a MOR Hudi table based on a condition
DELETE FROM hudi_mor_tbl WHERE id % 2 = 0;

-- Delete data using a non-primary key field
DELETE FROM hudi_cow_pt_tbl WHERE name = 'a1';

Data Skipping and Indexing

DML operations can be sped up using column statistics for data skipping and using indexes to reduce the amount of data scanned. For e.g. the following helps speed up the DELETE operation on a Hudi table, by using the record level index.

SET hoodie.enable.data.skipping=true;
SET hoodie.metadata.record.index.enable=true;
SET hoodie.metadata.enable=true;

DELETE from hudi_table where uuid = 'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa';

These DML operations give you powerful tools for managing your tables using Spark SQL. You can control the behavior of these operations using various configuration options, as explained in the documentation.

Flink SQL provides several Data Manipulation Language (DML) actions for interacting with Hudi tables. These operations allow you to insert, update and delete data from your Hudi tables. Let's explore them one by one.

Insert Into

You can utilize the INSERT INTO statement to incorporate data into a Hudi table using Flink SQL. Here are a few illustrative examples:

INSERT INTO <table> 
SELECT <columns> FROM <source>;

Examples:

-- Insert into a Hudi table
INSERT INTO hudi_table SELECT 1, 'a1', 20;

If the write.operation is 'upsert,' the INSERT INTO statement will not only insert new records but also update existing rows with the same record key.

-- Insert into a Hudi table in upsert mode
INSERT INTO hudi_table/*+ OPTIONS('write.operation'='upsert')*/ SELECT 1, 'a1', 20;

Update

With Flink SQL, you can use update command to update the hudi table. Here are a few illustrative examples:

UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
UPDATE hudi_table SET price = price * 2, ts = 1111 WHERE id = 1;
Key requirements

Update query only work with batch excution mode.

Delete From

With Flink SQL, you can use delete command to delete the rows from hudi table. Here are a few illustrative examples:

DELETE FROM tableIdentifier [ WHERE boolExpression ]
DELETE FROM hudi_table WHERE price < 100;
DELETE FROM hudi_table WHERE price < 100;
Key requirements

Delete query only work with batch excution mode.

Setting Writer/Reader Configs

With Flink SQL, you can additionally set the writer/reader writer configs along with the query.

INSERT INTO hudi_table/*+ OPTIONS('${hoodie.config.key1}'='${hoodie.config.value1}')*/
INSERT INTO hudi_table/*+ OPTIONS('hoodie.keep.max.commits'='true')*/