Skip to main content

Record Mergers in Apache Hudi

Aditya Goenka
10 min read

The Challenge of Unordered Streams of Events

One of the primary challenges associated with streaming workloads is the unordered nature of incoming events. In a typical streaming scenario, events can arrive out of sequence due to network latency, processing delays, or other factors. With the increasing volume and velocity of data being ingested from various sources—especially in mobile applications and IoT platforms—data processing frameworks must be equipped to handle mutations (i.e., changes to records) and out-of-order events. Traditional data storage systems and file formats, such as those optimized for batch processing, often struggle to manage these scenarios effectively. Hudi steps in with features specifically designed to handle such challenges. When events or changes to a record arrive at different times, they may not be in the same order in which they were originally generated. For example, in a smart city traffic monitoring system, sensors may report vehicle speeds at various intersections in real-time. However, due to network issues or delays, some sensor data might arrive later than others, possibly out of order. To handle this, the system needs to merge the new incoming data with existing records efficiently. Just like how Hudi’s merge modes control the merging of records with the same key in a storage system, ensuring consistency and accuracy, it ensures that the final traffic data reflects the correct event times, even when some data arrives with a delay. These merge modes help maintain a consistent, deterministic result under heavy load, making sure that late data updates the right records without causing inconsistencies. This can lead to several issues:

Data Integrity

When events are processed out of order, it can result in incorrect or inconsistent data states. For example, if an event representing a transaction is processed before the event that indicates the account balance, the resulting data may not accurately reflect the true state of the system.

Complexity in Processing

Handling unordered events often requires additional logic to ensure that data is processed in the correct sequence. This can complicate the data pipeline and increase the likelihood of errors.

What are Record Mergers

With the new api introduced with version 1.0.0, Hudi supports three primary merge modes, each suited to different stages of data processing: writing, compaction, and querying. 4 places/points of data processing [Subheader]

1. Merging input data before writing : Combining Change Records During Writes

When new data arrives for an existing record, Hudi performs deduplication on the input dataset. This process involves combining multiple change records for the same record key before the write phase. This is an optimization that also helps reduce the number of records written to the log files (in case of MOR). By merging changes upfront, Hudi reduces unnecessary records, improving the efficiency of both query and write operations. This step is crucial for handling stream data in real-time, where changes may arrive rapidly, and ensuring that only the final version of the record is written into the system. Normally these out of order events come together commonly in the same batch, With processing engines like spark, which deals with micro-batches, merging the input changes helps in reduces the number of records which needs to be written.

2. Merging Final Change Record in CoW (Copy-on-Write) Tables: Applying Changes to Existing Records

In Copy-on-Write (CoW) tables, changes are applied by creating new file versions for the records. When an update, partial update, or delete operation occurs, Hudi will merge this final change with the existing record in the storage. The merge mode controls how these updates are applied, ensuring that only the most recent changes are reflected and the table’s data remains consistent. This is especially important in CoW tables, as they preserve immutability of historical data by writing new versions of the records instead of overwriting the existing data. The merge mode ensures that the new version of the record is consistent with all previous changes.

3. Compaction Merge in MoR (Merge-on-Read) Tables : Merging Log Files with Base Files

Hudi uses a concept of log files (delta logs) and base files (original data). As changes to records accumulate over time, Hudi’s compaction service merges the change records stored in the log files with the base files to keep the data consistent and query-optimized. The merge mode defines how these log records are merged with base files during the compaction process. Compaction helps maintain storage efficiency and ensures that queries run faster by reducing the number of small log files that might need to be read.

4. Query-Time Merge: Merging Log Files with Base Files in MoR (Merge-on-Read) Tables

In Merge-on-Read (MoR) tables, the data is stored in both log files and base files. When a query is executed, Hudi merges the change records in the log files with the base files based on the merge mode. The merge operation occurs at query time to provide the final, consistent view of the data. By merging records at query time, Hudi ensures that queries reflect the most recent changes while maintaining query performance.

Implementation

In common scenarios, the input data contains a field that can be used to identify the latest record. Typically, tables have fields like updated_at or other ordering columns. If no such column is present in the input, we are limited to relying on the incoming order.

After the release of Hudi 1.0.0, a new configuration, hoodie.record.merge.mode was introduced to define the merge modes responsible for handling record updates. These merge modes dictate how records with the same key are processed at different stages of the pipeline, from data ingestion to query results. It can have the following three values:

1. COMMIT_TIME_ORDERING

This merge mode is used when no field is available in the input data to explicitly determine which record is the latest. The system will rely on the order of ingestion (commit time) to determine the order of records. Hudi expects records to arrive in strict order of their commits. So, the most recent record (in terms of ingestion time) is assumed to be the latest version of the record. This mode is typically used when there is no dedicated column like updated_at, timestamp, or versioning field that can indicate the order of the records. The merging logic here simply picks the latest write based on the ingestion order (commit time). In a way, it's equivalent to overwriting semantics where only the most recent record is considered. Example -

SET hoodie.spark.sql.insert.into.operation=upsert;
CREATE TABLE hudi_table (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI TBLPROPERTIES (primaryKey = 'uuid', hoodie.record.merge.mode='COMMIT_TIME_ORDERING');

INSERT INTO hudi_table
VALUES
(3,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(2,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-C','driver-M',27.70 ,'san_francisco');

select * from hudi_table;
-- Result - 20250106162911278 20250106162911278_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 08218473-f72a-480d-90e6-c6764f062e5c-0_0-43-47_20250106162911278.parquet 1695091554788 334e26e9-8355-45cc-97c6-c31daf0df330 rider-C driver-M 27.7 san_francisco

INSERT INTO hudi_table
VALUES
(1,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-D','driver-K',19.10,'san_francisco');

select * from hudi_table;
-- Result - 20250106163449812 20250106163449812_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 08218473-f72a-480d-90e6-c6764f062e5c-0_0-71-68_20250106163449812.parquet 1 334e26e9-8355-45cc-97c6-c31daf0df330 rider-D driver-K 19.1 san_francisco

In the example above, we created the table using the COMMIT_TIME_ORDERING merge mode. When using this mode, there is no need to specify a precombine or ordering field. During the first insert, two records with the same record key are provided. The system will deduplicate them and keep the record that is processed later. In the second insert, a new record with the same record key is inserted. As expected, the table is updated with the new record because it is committed later, regardless of the values in any of the fields.

2. EVENT_TIME_ORDERING (DEFAULT)

This merge mode is used when you do have a field in the input data that can be used to determine the order of events (such as a timestamp field like updated_at or a version number). If your records contain a field that can be used to track when the record was last updated (e.g., updated_at, last_modified, or a sequence number), Hudi will use this field to determine which record is the latest. In this case, Hudi does not rely on the ingestion order but instead uses the value of the ordering field (updated_at, for example) to decide the correct record. This approach is ideal when you have temporal or event-driven data, and you want to maintain the "latest" record according to an event timestamp. Example -

DROP TABLE hudi_table;
SET hoodie.spark.sql.insert.into.operation=upsert;

CREATE TABLE hudi_table (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI TBLPROPERTIES (primaryKey = 'uuid',preCombineField = 'ts', hoodie.record.merge.mode='EVENT_TIME_ORDERING');

INSERT INTO hudi_table
VALUES
(3,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(2,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-C','driver-M',27.70 ,'san_francisco');

select * from hudi_table;
-- Result - 20250106165902806 20250106165902806_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 568ce7bc-9b71-4e15-b557-cbaeb5b4d2ea-0_0-56-57_20250106165902806.parquet 3 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 19.1 san_francisco

INSERT INTO hudi_table
VALUES
(1,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-D','driver-K',18.00,'san_francisco');

select * from hudi_table;
-- Result - 20250106165902806 20250106165902806_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 568ce7bc-9b71-4e15-b557-cbaeb5b4d2ea-0_0-84-78_20250106165918731.parquet 3 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 19.1 san_francisco

In the example above, we created the table using the EVENT_TIME_ORDERING merge mode. When using this mode, we need to specify the precombineField. In this case we are specifying ts as the precombineField. During the first insert, two records with the same record key are provided. The system will deduplicate them and keep the record that is processed later. In the second insert, a new record with the same record key is inserted. As expected, the table is updated with the new record because it is committed later, regardless of the values in any of the fields.

3. CUSTOM

For more complex use-case sometimes prior discussed merging modes won’t work. We may need to implement a use-case specific merging logic. The details for the implementation is provided here - https://hudi.apache.org/docs/record_merger/#custom

Record Payloads

Pre 1.0.0, Hudi uses the legacy Record Payload API, Please refer to the Record Payloads section to know about the implementation and some of the existing record payloads.

Along with the existing payloads, Hudi provides flexibility to implement the custom record payload by implementing the HoodieRecordPayload interface

The following example demonstrates the use of Record Payload, which achieves a similar outcome to what EVENT_TIME_ORDERING does. We’ve used the same example as above to illustrate how this functionality works.

DROP TABLE hudi_table;
SET hoodie.spark.sql.insert.into.operation=upsert;

CREATE TABLE hudi_table (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI TBLPROPERTIES (primaryKey = 'uuid',preCombineField = 'ts', hoodie.datasource.write.payload.class='org.apache.hudi.common.model.DefaultHoodieRecordPayload');

INSERT INTO hudi_table
VALUES
(3,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(2,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-C','driver-M',27.70 ,'san_francisco');

select * from hudi_table;
-- Result - 20250203164444124 20250203164444124_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 4549ed8e-0346-4d59-8878-9e047fb6c651-0_0-14-17_20250203164444124.parquet 3 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 19.1 san_francisco


INSERT INTO hudi_table
VALUES
(1,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-D','driver-K',18.00,'san_francisco');

select * from hudi_table;
-- Result - 20250203164444124 20250203164444124_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 4549ed8e-0346-4d59-8878-9e047fb6c651-0_0-53-51_20250203164537068.parquet 3 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 19.1 san_francisco

Conclusion

In conclusion, managing late-arriving and out-of-order data is a critical challenge in modern data processing systems, especially when dealing with large-scale, real-time data pipelines. Tools like Hudi provide powerful merge modes that ensure data consistency, accuracy, and efficiency by handling record updates intelligently across different stages of the pipeline. Whether you're working with streaming data, IoT sensors, or social media posts, understanding how to configure and use these merge modes can greatly improve the performance and reliability of your data storage and query processes. By leveraging the right merge strategy, you can ensure that your system remains robust, even under heavy load and with delayed data, ultimately enabling better decision-making and insights from your data.