Compaction
Background
Compaction is a table service employed by Hudi specifically in Merge On Read(MOR) tables to merge updates from row-based log files to the corresponding columnar-based base file periodically to produce a new version of the base file. Compaction is not applicable to Copy On Write(COW) tables and only applies to MOR tables.
Why MOR tables need compaction?
To understand the significance of compaction in MOR tables, it is helpful to understand the MOR table layout first. In Hudi, data is organized in terms of file groups. Each file group in a MOR table consists of a base file and one or more log files. Typically, during writes, inserts are stored in the base file, and updates are appended to log files.
Figure: MOR table file layout showing different file groups with base data file and log files
During the compaction process, updates from the log files are merged with the base file to form a new version of the base file as shown below. Since MOR is designed to be write-optimized, on new writes, after index tagging is complete, Hudi appends the records pertaining to each file groups as log blocks in log files. There is no synchronous merge happening during write, resulting in a lower write amplification and better write latency. In contrast, on new writes to a COW table, Hudi combines the new writes with the older base file to produce a new version of the base file resulting in a higher write amplification and higher write latencies.
Figure: Compaction on a given file group
While serving the read query(snapshot read), for each file group, records in base file and all its corresponding log files are merged together and served. And hence the read latency for MOR snapshot query might be higher compared to COW table since there is no merge involved in case of COW at read time. Compaction takes care of merging the updates from log files with the base file at regular intervals to bound the growth of log files and to ensure the read latencies do not spike up.
Compaction Architecture
There are two steps to compaction.
- Compaction Scheduling: In this step, Hudi scans the partitions and selects file slices to be compacted. A compaction plan is finally written to Hudi timeline.
- Compaction Execution: In this step the compaction plan is read and file slices are compacted.
Strategies in Compaction Scheduling
There are two strategies involved in scheduling the compaction:
- Trigger Strategy: Determines how often to trigger scheduling of the compaction.
- Compaction Strategy: Determines which file groups to compact.
Hudi provides various options for both these strategies as discussed below.
Trigger Strategies
Config Name | Default | Description |
---|---|---|
hoodie.compact.inline.trigger.strategy | NUM_COMMITS (Optional) | org.apache.hudi.table.action.compact.CompactionTriggerStrategy: Controls when compaction is scheduled.Config Param: INLINE_COMPACT_TRIGGER_STRATEGY |
|
Compaction Strategies
Config Name | Default | Description |
---|---|---|
hoodie.compaction.strategy | org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy (Optional) | Compaction strategy decides which file groups are picked up for compaction during each compaction run. By default. Hudi picks the log file with most accumulated unmerged data. Config Param: COMPACTION_STRATEGY |
Available Strategies (Provide the full package name when using the strategy):
LogFileNumBasedCompactionStrategy
: orders the compactions based on the total log files count, filters the file group with log files count greater than the threshold and limits the compactions within a configured IO bound.LogFileSizeBasedCompactionStrategy
: orders the compactions based on the total log files size, filters the file group which log files size is greater than the threshold and limits the compactions within a configured IO bound.BoundedIOCompactionStrategy
: CompactionStrategy which looks at total IO to be done for the compaction (read + write) and limits the list of compactions to be under a configured limit on the IO.BoundedPartitionAwareCompactionStrategy
:This strategy ensures that the last N partitions are picked up even if there are later partitions created for the table. lastNPartitions is defined as the N partitions before the currentDate. currentDay = 2018/01/01 The table has partitions for 2018/02/02 and 2018/03/03 beyond the currentDay This strategy will pick up the following partitions for compaction : (2018/01/01, allPartitionsInRange[(2018/01/01 - lastNPartitions) to 2018/01/01), 2018/02/02, 2018/03/03)DayBasedCompactionStrategy
:This strategy orders compactions in reverse order of creation of Hive Partitions. It helps to compact data in latest partitions first and then older capped at the Total_IO allowed.UnBoundedCompactionStrategy
: UnBoundedCompactionStrategy will not change ordering or filter any compaction. It is a pass-through and will compact all the base files which has a log file. This usually means no-intelligence on compaction.UnBoundedPartitionAwareCompactionStrategy
:UnBoundedPartitionAwareCompactionStrategy is a custom UnBounded Strategy. This will filter all the partitions that are eligible to be compacted by a {@link BoundedPartitionAwareCompactionStrategy} and return the result. This is done so that a long running UnBoundedPartitionAwareCompactionStrategy does not step over partitions in a shorter running BoundedPartitionAwareCompactionStrategy. Essentially, this is an inverse of the partitions chosen in BoundedPartitionAwareCompactionStrategy
Please refer to advanced configs for more details.
Ways to trigger Compaction
Inline
By default, compaction is run asynchronously.
If latency of ingesting records is important for you, you are most likely using Merge-On-Read tables. Merge-On-Read tables store data using a combination of columnar (e.g parquet) + row based (e.g avro) file formats. Updates are logged to delta files & later compacted to produce new versions of columnar files. To improve ingestion latency, Async Compaction is the default configuration.
If immediate read performance of a new commit is important for you, or you want simplicity of not managing separate compaction jobs, you may want synchronous inline compaction, which means that as a commit is written it is also compacted by the same job.
For this deployment mode, please use hoodie.compact.inline = true
for Spark Datasource and Spark SQL writers. For
HoodieStreamer sync once mode inline compaction can be achieved by passing the flag --disable-compaction
(Meaning to
disable async compaction). Further in HoodieStreamer when both
ingestion and compaction is running in the same spark context, you can use resource allocation configuration
in Hudi Streamer CLI such as (--delta-sync-scheduling-weight
,
--compact-scheduling-weight
, --delta-sync-scheduling-minshare
, and --compact-scheduling-minshare
)
to control executor allocation between ingestion and compaction.
Async & Offline Compaction models
There are a couple of ways here to trigger compaction .
Async execution within the same process
In streaming ingestion write models like HoodieStreamer continuous mode, Flink and Spark Streaming, async compaction is enabled by default and runs alongside without blocking regular ingestion.