This is an automated email from the ASF dual-hosted git repository. bhavanisudha pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push: new 2c802a31224 [DOCS] Added blog for record mergers in hudi (#12904) 2c802a31224 is described below commit 2c802a312242ebe64a83c9a20b8e3b8a32e502ef Author: Aditya Goenka <63430370+ad1happy...@users.noreply.github.com> AuthorDate: Tue Mar 4 17:13:47 2025 +0530 [DOCS] Added blog for record mergers in hudi (#12904) * Added record merger blog * updated image --- .../2025-03-03-record-mergers-in-apache-hudi.png | Bin 0 -> 163237 bytes website/blog/2025-03-03-record-mergers-in-hudi.mdx | 164 +++++++++++++++++++++ 2 files changed, 164 insertions(+) diff --git a/content/assets/images/blog/2025-03-03-record-mergers-in-apache-hudi.png b/content/assets/images/blog/2025-03-03-record-mergers-in-apache-hudi.png new file mode 100644 index 00000000000..162622904ee Binary files /dev/null and b/content/assets/images/blog/2025-03-03-record-mergers-in-apache-hudi.png differ diff --git a/website/blog/2025-03-03-record-mergers-in-hudi.mdx b/website/blog/2025-03-03-record-mergers-in-hudi.mdx new file mode 100644 index 00000000000..6ee14571eed --- /dev/null +++ b/website/blog/2025-03-03-record-mergers-in-hudi.mdx @@ -0,0 +1,164 @@ +--- +title: "Record Mergers in Apache Hudi" +excerpt: "Explain need for record mergers in Apache Hudi and implemenation details" +author: Aditya Goenka +category: blog +image: /assets/images/blog/2025-03-03-record-mergers-in-apache-hudi.png +tags: +- Data Lake +- Data Lakehouse +- Apache Hudi +- Record Mergers +- Record payloads +- Late Arriving Data +--- + +## The Challenge of Unordered Streams of Events +One of the primary challenges associated with streaming workloads is the unordered nature of incoming events. In a typical streaming scenario, events can arrive out of sequence due to network latency, processing delays, or other factors. With the increasing volume and velocity of data being ingested from various sources—especially in mobile applications and IoT platforms—data processing frameworks must be equipped to handle mutations (i.e., changes to records) and out-of-order events. +Traditional data storage systems and file formats, such as those optimized for batch processing, often struggle to manage these scenarios effectively. Hudi steps in with features specifically designed to handle such challenges. +When events or changes to a record arrive at different times, they may not be in the same order in which they were originally generated. For example, in a smart city traffic monitoring system, sensors may report vehicle speeds at various intersections in real-time. However, due to network issues or delays, some sensor data might arrive later than others, possibly out of order. To handle this, the system needs to merge the new incoming data with existing records efficiently. Just like how [...] +This can lead to several issues: +#### Data Integrity +When events are processed out of order, it can result in incorrect or inconsistent data states. For example, if an event representing a transaction is processed before the event that indicates the account balance, the resulting data may not accurately reflect the true state of the system. +#### Complexity in Processing +Handling unordered events often requires additional logic to ensure that data is processed in the correct sequence. This can complicate the data pipeline and increase the likelihood of errors. + +## What are Record Mergers +With the new api introduced with version 1.0.0, Hudi supports three primary merge modes, each suited to different stages of data processing: writing, compaction, and querying. +4 places/points of data processing [Subheader] +#### 1. Merging input data before writing : Combining Change Records During Writes +When new data arrives for an existing record, Hudi performs deduplication on the input dataset. This process involves combining multiple change records for the same record key before the write phase. This is an optimization that also helps reduce the number of records written to the log files (in case of MOR). By merging changes upfront, Hudi reduces unnecessary records, improving the efficiency of both query and write operations. +This step is crucial for handling stream data in real-time, where changes may arrive rapidly, and ensuring that only the final version of the record is written into the system. Normally these out of order events come together commonly in the same batch, With processing engines like spark, which deals with micro-batches, merging the input changes helps in reduces the number of records which needs to be written. +#### 2. Merging Final Change Record in CoW (Copy-on-Write) Tables: Applying Changes to Existing Records +In Copy-on-Write (CoW) tables, changes are applied by creating new file versions for the records. When an update, partial update, or delete operation occurs, Hudi will merge this final change with the existing record in the storage. The merge mode controls how these updates are applied, ensuring that only the most recent changes are reflected and the table’s data remains consistent. +This is especially important in CoW tables, as they preserve immutability of historical data by writing new versions of the records instead of overwriting the existing data. The merge mode ensures that the new version of the record is consistent with all previous changes. +#### 3. Compaction Merge in MoR (Merge-on-Read) Tables : Merging Log Files with Base Files +Hudi uses a concept of log files (delta logs) and base files (original data). As changes to records accumulate over time, Hudi’s compaction service merges the change records stored in the log files with the base files to keep the data consistent and query-optimized. The merge mode defines how these log records are merged with base files during the compaction process. +Compaction helps maintain storage efficiency and ensures that queries run faster by reducing the number of small log files that might need to be read. +#### 4. Query-Time Merge: Merging Log Files with Base Files in MoR (Merge-on-Read) Tables +In Merge-on-Read (MoR) tables, the data is stored in both log files and base files. When a query is executed, Hudi merges the change records in the log files with the base files based on the merge mode. The merge operation occurs at query time to provide the final, consistent view of the data. +By merging records at query time, Hudi ensures that queries reflect the most recent changes while maintaining query performance. + + +## Implementation +In common scenarios, the input data contains a field that can be used to identify the latest record. Typically, tables have fields like updated_at or other ordering columns. If no such column is present in the input, we are limited to relying on the incoming order. + +After the release of Hudi 1.0.0, a new configuration, [hoodie.record.merge.mode](https://hudi.apache.org/docs/configurations/#hoodierecordmergemode) was introduced to define the merge modes responsible for handling record updates. These merge modes dictate how records with the same key are processed at different stages of the pipeline, from data ingestion to query results. +It can have the following three values: + +#### 1. COMMIT_TIME_ORDERING +This merge mode is used when no field is available in the input data to explicitly determine which record is the latest. The system will rely on the order of ingestion (commit time) to determine the order of records. Hudi expects records to arrive in strict order of their commits. So, the most recent record (in terms of ingestion time) is assumed to be the latest version of the record. This mode is typically used when there is no dedicated column like updated_at, timestamp, or versioning [...] +The merging logic here simply picks the latest write based on the ingestion order (commit time). In a way, it's equivalent to overwriting semantics where only the most recent record is considered. +Example - +```sql +SET hoodie.spark.sql.insert.into.operation=upsert; +CREATE TABLE hudi_table ( + ts BIGINT, + uuid STRING, + rider STRING, + driver STRING, + fare DOUBLE, + city STRING +) USING HUDI TBLPROPERTIES (primaryKey = 'uuid', hoodie.record.merge.mode='COMMIT_TIME_ORDERING'); + +INSERT INTO hudi_table +VALUES +(3,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'), +(2,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-C','driver-M',27.70 ,'san_francisco'); + +select * from hudi_table; +-- Result - 20250106162911278 20250106162911278_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 08218473-f72a-480d-90e6-c6764f062e5c-0_0-43-47_20250106162911278.parquet 1695091554788 334e26e9-8355-45cc-97c6-c31daf0df330 rider-C driver-M 27.7 san_francisco + +INSERT INTO hudi_table +VALUES +(1,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-D','driver-K',19.10,'san_francisco'); + +select * from hudi_table; +-- Result - 20250106163449812 20250106163449812_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 08218473-f72a-480d-90e6-c6764f062e5c-0_0-71-68_20250106163449812.parquet 1 334e26e9-8355-45cc-97c6-c31daf0df330 rider-D driver-K 19.1 san_francisco +``` + +In the example above, we created the table using the COMMIT_TIME_ORDERING merge mode. When using this mode, there is no need to specify a precombine or ordering field. +During the first insert, two records with the same record key are provided. The system will deduplicate them and keep the record that is processed later. +In the second insert, a new record with the same record key is inserted. As expected, the table is updated with the new record because it is committed later, regardless of the values in any of the fields. + +#### 2. EVENT_TIME_ORDERING (DEFAULT) +This merge mode is used when you do have a field in the input data that can be used to determine the order of events (such as a timestamp field like updated_at or a version number). If your records contain a field that can be used to track when the record was last updated (e.g., updated_at, last_modified, or a sequence number), Hudi will use this field to determine which record is the latest. +In this case, Hudi does not rely on the ingestion order but instead uses the value of the ordering field (updated_at, for example) to decide the correct record. +This approach is ideal when you have temporal or event-driven data, and you want to maintain the "latest" record according to an event timestamp. +Example - +```sql +DROP TABLE hudi_table; +SET hoodie.spark.sql.insert.into.operation=upsert; + +CREATE TABLE hudi_table ( + ts BIGINT, + uuid STRING, + rider STRING, + driver STRING, + fare DOUBLE, + city STRING +) USING HUDI TBLPROPERTIES (primaryKey = 'uuid',preCombineField = 'ts', hoodie.record.merge.mode='EVENT_TIME_ORDERING'); + +INSERT INTO hudi_table +VALUES +(3,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'), +(2,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-C','driver-M',27.70 ,'san_francisco'); + +select * from hudi_table; +-- Result - 20250106165902806 20250106165902806_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 568ce7bc-9b71-4e15-b557-cbaeb5b4d2ea-0_0-56-57_20250106165902806.parquet 3 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 19.1 san_francisco + +INSERT INTO hudi_table +VALUES +(1,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-D','driver-K',18.00,'san_francisco'); + +select * from hudi_table; +-- Result - 20250106165902806 20250106165902806_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 568ce7bc-9b71-4e15-b557-cbaeb5b4d2ea-0_0-84-78_20250106165918731.parquet 3 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 19.1 san_francisco +``` + +In the example above, we created the table using the EVENT_TIME_ORDERING merge mode. When using this mode, we need to specify the precombineField. In this case we are specifying ts as the precombineField. +During the first insert, two records with the same record key are provided. The system will deduplicate them and keep the record that is processed later. +In the second insert, a new record with the same record key is inserted. As expected, the table is updated with the new record because it is committed later, regardless of the values in any of the fields. + +#### 3. CUSTOM +For more complex use-case sometimes prior discussed merging modes won’t work. We may need to implement a use-case specific merging logic. +The details for the implementation is provided here - https://hudi.apache.org/docs/record_merger/#custom + +## Record Payloads +Pre 1.0.0, Hudi uses the legacy Record Payload API, Please refer to the [Record Payloads](https://hudi.apache.org/docs/record_merger/#record-payloads) section to know about the implementation and some of the existing record payloads. + +Along with the existing payloads, Hudi provides flexibility to implement the custom record payload by implementing the [HoodieRecordPayload](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java) interface + +The following example demonstrates the use of Record Payload, which achieves a similar outcome to what EVENT_TIME_ORDERING does. We’ve used the same example as above to illustrate how this functionality works. + +```sql +DROP TABLE hudi_table; +SET hoodie.spark.sql.insert.into.operation=upsert; + +CREATE TABLE hudi_table ( + ts BIGINT, + uuid STRING, + rider STRING, + driver STRING, + fare DOUBLE, + city STRING +) USING HUDI TBLPROPERTIES (primaryKey = 'uuid',preCombineField = 'ts', hoodie.datasource.write.payload.class='org.apache.hudi.common.model.DefaultHoodieRecordPayload'); + +INSERT INTO hudi_table +VALUES +(3,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'), +(2,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-C','driver-M',27.70 ,'san_francisco'); + +select * from hudi_table; +-- Result - 20250203164444124 20250203164444124_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 4549ed8e-0346-4d59-8878-9e047fb6c651-0_0-14-17_20250203164444124.parquet 3 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 19.1 san_francisco + + +INSERT INTO hudi_table +VALUES +(1,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-D','driver-K',18.00,'san_francisco'); + +select * from hudi_table; +-- Result - 20250203164444124 20250203164444124_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 4549ed8e-0346-4d59-8878-9e047fb6c651-0_0-53-51_20250203164537068.parquet 3 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 19.1 san_francisco +``` + +## Conclusion +In conclusion, managing late-arriving and out-of-order data is a critical challenge in modern data processing systems, especially when dealing with large-scale, real-time data pipelines. Tools like Hudi provide powerful merge modes that ensure data consistency, accuracy, and efficiency by handling record updates intelligently across different stages of the pipeline. Whether you're working with streaming data, IoT sensors, or social media posts, understanding how to configure and use thes [...]