This is an automated email from the ASF dual-hosted git repository.

bhavanisudha pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 2c802a31224 [DOCS] Added blog for  record mergers in hudi  (#12904)
2c802a31224 is described below

commit 2c802a312242ebe64a83c9a20b8e3b8a32e502ef
Author: Aditya Goenka <63430370+ad1happy...@users.noreply.github.com>
AuthorDate: Tue Mar 4 17:13:47 2025 +0530

    [DOCS] Added blog for  record mergers in hudi  (#12904)
    
    * Added record merger blog
    
    * updated image
---
 .../2025-03-03-record-mergers-in-apache-hudi.png   | Bin 0 -> 163237 bytes
 website/blog/2025-03-03-record-mergers-in-hudi.mdx | 164 +++++++++++++++++++++
 2 files changed, 164 insertions(+)

diff --git 
a/content/assets/images/blog/2025-03-03-record-mergers-in-apache-hudi.png 
b/content/assets/images/blog/2025-03-03-record-mergers-in-apache-hudi.png
new file mode 100644
index 00000000000..162622904ee
Binary files /dev/null and 
b/content/assets/images/blog/2025-03-03-record-mergers-in-apache-hudi.png differ
diff --git a/website/blog/2025-03-03-record-mergers-in-hudi.mdx 
b/website/blog/2025-03-03-record-mergers-in-hudi.mdx
new file mode 100644
index 00000000000..6ee14571eed
--- /dev/null
+++ b/website/blog/2025-03-03-record-mergers-in-hudi.mdx
@@ -0,0 +1,164 @@
+---
+title: "Record Mergers in Apache Hudi"
+excerpt: "Explain need for record mergers in Apache Hudi and implemenation 
details"
+author: Aditya Goenka
+category: blog
+image: /assets/images/blog/2025-03-03-record-mergers-in-apache-hudi.png
+tags:
+- Data Lake
+- Data Lakehouse
+- Apache Hudi
+- Record Mergers
+- Record payloads
+- Late Arriving Data
+---
+
+## The Challenge of Unordered Streams of Events
+One of the primary challenges associated with streaming workloads is the 
unordered nature of incoming events. In a typical streaming scenario, events 
can arrive out of sequence due to network latency, processing delays, or other 
factors. With the increasing volume and velocity of data being ingested from 
various sources—especially in mobile applications and IoT platforms—data 
processing frameworks must be equipped to handle mutations (i.e., changes to 
records) and out-of-order events.
+Traditional data storage systems and file formats, such as those optimized for 
batch processing, often struggle to manage these scenarios effectively. Hudi 
steps in with features specifically designed to handle such challenges.
+When events or changes to a record arrive at different times, they may not be 
in the same order in which they were originally generated. For example, in a 
smart city traffic monitoring system, sensors may report vehicle speeds at 
various intersections in real-time. However, due to network issues or delays, 
some sensor data might arrive later than others, possibly out of order. To 
handle this, the system needs to merge the new incoming data with existing 
records efficiently. Just like how [...]
+This can lead to several issues:
+#### Data Integrity
+When events are processed out of order, it can result in incorrect or 
inconsistent data states. For example, if an event representing a transaction 
is processed before the event that indicates the account balance, the resulting 
data may not accurately reflect the true state of the system.
+#### Complexity in Processing
+Handling unordered events often requires additional logic to ensure that data 
is processed in the correct sequence. This can complicate the data pipeline and 
increase the likelihood of errors.
+
+## What are Record Mergers
+With the new api introduced with version 1.0.0, Hudi supports three primary 
merge modes, each suited to different stages of data processing: writing, 
compaction, and querying.
+4 places/points of data processing [Subheader]
+#### 1. Merging input data before writing : Combining Change Records During 
Writes
+When new data arrives for an existing record, Hudi performs deduplication on 
the input dataset. This process involves combining multiple change records for 
the same record key before the write phase. This is an optimization that also 
helps reduce the number of records written to the log files (in case of MOR). 
By merging changes upfront, Hudi reduces unnecessary records, improving the 
efficiency of both query and write operations.
+This step is crucial for handling stream data in real-time, where changes may 
arrive rapidly, and ensuring that only the final version of the record is 
written into the system. Normally these out of order events come together 
commonly in the same batch,  With processing engines like spark, which deals 
with micro-batches, merging the input changes helps in reduces the number of 
records which needs to be written.
+#### 2. Merging Final Change Record in CoW (Copy-on-Write) Tables: Applying 
Changes to Existing Records
+In Copy-on-Write (CoW) tables, changes are applied by creating new file 
versions for the records. When an update, partial update, or delete operation 
occurs, Hudi will merge this final change with the existing record in the 
storage. The merge mode controls how these updates are applied, ensuring that 
only the most recent changes are reflected and the table’s data remains 
consistent.
+This is especially important in CoW tables, as they preserve immutability of 
historical data by writing new versions of the records instead of overwriting 
the existing data. The merge mode ensures that the new version of the record is 
consistent with all previous changes.
+#### 3. Compaction Merge in MoR (Merge-on-Read) Tables : Merging Log Files 
with Base Files
+Hudi uses a concept of log files (delta logs) and base files (original data). 
As changes to records accumulate over time, Hudi’s compaction service merges 
the change records stored in the log files with the base files to keep the data 
consistent and query-optimized. The merge mode defines how these log records 
are merged with base files during the compaction process.
+Compaction helps maintain storage efficiency and ensures that queries run 
faster by reducing the number of small log files that might need to be read.
+#### 4. Query-Time Merge: Merging Log Files with Base Files in MoR 
(Merge-on-Read) Tables
+In Merge-on-Read (MoR) tables, the data is stored in both log files and base 
files. When a query is executed, Hudi merges the change records in the log 
files with the base files based on the merge mode. The merge operation occurs 
at query time to provide the final, consistent view of the data.
+By merging records at query time, Hudi ensures that queries reflect the most 
recent changes while maintaining query performance.
+
+
+## Implementation
+In common scenarios, the input data contains a field that can be used to 
identify the latest record. Typically, tables have fields like updated_at or 
other ordering columns. If no such column is present in the input, we are 
limited to relying on the incoming order.
+
+After the release of Hudi 1.0.0, a new configuration, 
[hoodie.record.merge.mode](https://hudi.apache.org/docs/configurations/#hoodierecordmergemode)
 was introduced to define the merge modes responsible for handling record 
updates. These merge modes dictate how records with the same key are processed 
at different stages of the pipeline, from data ingestion to query results.
+It can have the following three values:
+
+#### 1. COMMIT_TIME_ORDERING
+This merge mode is used when no field is available in the input data to 
explicitly determine which record is the latest. The system will rely on the 
order of ingestion (commit time) to determine the order of records. Hudi 
expects records to arrive in strict order of their commits. So, the most recent 
record (in terms of ingestion time) is assumed to be the latest version of the 
record. This mode is typically used when there is no dedicated column like 
updated_at, timestamp, or versioning [...]
+The merging logic here simply picks the latest write based on the ingestion 
order (commit time). In a way, it's equivalent to overwriting semantics where 
only the most recent record is considered.
+Example -
+```sql
+SET hoodie.spark.sql.insert.into.operation=upsert;
+CREATE TABLE hudi_table (
+    ts BIGINT,
+    uuid STRING,
+    rider STRING,
+    driver STRING,
+    fare DOUBLE,
+    city STRING
+) USING HUDI TBLPROPERTIES (primaryKey = 'uuid', 
hoodie.record.merge.mode='COMMIT_TIME_ORDERING');
+
+INSERT INTO hudi_table
+VALUES
+(3,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
+(2,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-C','driver-M',27.70 
,'san_francisco');
+
+select * from hudi_table;
+-- Result - 20250106162911278  20250106162911278_0_0   
334e26e9-8355-45cc-97c6-c31daf0df330            
08218473-f72a-480d-90e6-c6764f062e5c-0_0-43-47_20250106162911278.parquet        
1695091554788   334e26e9-8355-45cc-97c6-c31daf0df330    rider-C driver-M        
27.7    san_francisco
+
+INSERT INTO hudi_table
+VALUES
+(1,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-D','driver-K',19.10,'san_francisco');
+
+select * from hudi_table;
+-- Result - 20250106163449812  20250106163449812_0_0   
334e26e9-8355-45cc-97c6-c31daf0df330            
08218473-f72a-480d-90e6-c6764f062e5c-0_0-71-68_20250106163449812.parquet        
1       334e26e9-8355-45cc-97c6-c31daf0df330    rider-D driver-K        19.1    
san_francisco
+```
+
+In the example above, we created the table using the COMMIT_TIME_ORDERING 
merge mode. When using this mode, there is no need to specify a precombine or 
ordering field.
+During the first insert, two records with the same record key are provided. 
The system will deduplicate them and keep the record that is processed later.
+In the second insert, a new record with the same record key is inserted. As 
expected, the table is updated with the new record because it is committed 
later, regardless of the values in any of the fields.
+
+#### 2. EVENT_TIME_ORDERING (DEFAULT)
+This merge mode is used when you do have a field in the input data that can be 
used to determine the order of events (such as a timestamp field like 
updated_at or a version number). If your records contain a field that can be 
used to track when the record was last updated (e.g., updated_at, 
last_modified, or a sequence number), Hudi will use this field to determine 
which record is the latest.
+In this case, Hudi does not rely on the ingestion order but instead uses the 
value of the ordering field (updated_at, for example) to decide the correct 
record.
+This approach is ideal when you have temporal or event-driven data, and you 
want to maintain the "latest" record according to an event timestamp.
+Example -
+```sql
+DROP TABLE hudi_table;
+SET hoodie.spark.sql.insert.into.operation=upsert;
+
+CREATE TABLE hudi_table (
+    ts BIGINT,
+    uuid STRING,
+    rider STRING,
+    driver STRING,
+    fare DOUBLE,
+    city STRING
+) USING HUDI TBLPROPERTIES (primaryKey = 'uuid',preCombineField = 'ts', 
hoodie.record.merge.mode='EVENT_TIME_ORDERING');
+
+INSERT INTO hudi_table
+VALUES
+(3,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
+(2,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-C','driver-M',27.70 
,'san_francisco');
+
+select * from hudi_table;
+-- Result - 20250106165902806  20250106165902806_0_0   
334e26e9-8355-45cc-97c6-c31daf0df330            
568ce7bc-9b71-4e15-b557-cbaeb5b4d2ea-0_0-56-57_20250106165902806.parquet        
3       334e26e9-8355-45cc-97c6-c31daf0df330    rider-A driver-K        19.1    
san_francisco
+
+INSERT INTO hudi_table
+VALUES
+(1,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-D','driver-K',18.00,'san_francisco');
+
+select * from hudi_table;
+-- Result - 20250106165902806  20250106165902806_0_0   
334e26e9-8355-45cc-97c6-c31daf0df330            
568ce7bc-9b71-4e15-b557-cbaeb5b4d2ea-0_0-84-78_20250106165918731.parquet        
3       334e26e9-8355-45cc-97c6-c31daf0df330    rider-A driver-K        19.1    
san_francisco
+```
+
+In the example above, we created the table using the EVENT_TIME_ORDERING merge 
mode. When using this mode, we need to specify the precombineField. In this 
case we are specifying ts as the precombineField.
+During the first insert, two records with the same record key are provided. 
The system will deduplicate them and keep the record that is processed later.
+In the second insert, a new record with the same record key is inserted. As 
expected, the table is updated with the new record because it is committed 
later, regardless of the values in any of the fields.
+
+#### 3. CUSTOM
+For more complex use-case sometimes prior discussed merging modes won’t work. 
We may need to implement a use-case specific merging logic.
+The details for the implementation is provided here  - 
https://hudi.apache.org/docs/record_merger/#custom
+
+## Record Payloads
+Pre 1.0.0, Hudi uses the legacy Record Payload API, Please refer to the 
[Record Payloads](https://hudi.apache.org/docs/record_merger/#record-payloads) 
section to know about the implementation and some of the existing record 
payloads.
+
+Along with the existing payloads, Hudi provides flexibility to implement the 
custom record payload by implementing the 
[HoodieRecordPayload](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java)
 interface
+
+The following example demonstrates the use of Record Payload, which achieves a 
similar outcome to what EVENT_TIME_ORDERING does. We’ve used the same example 
as above to illustrate how this functionality works.
+
+```sql
+DROP TABLE hudi_table;
+SET hoodie.spark.sql.insert.into.operation=upsert;
+
+CREATE TABLE hudi_table (
+    ts BIGINT,
+    uuid STRING,
+    rider STRING,
+    driver STRING,
+    fare DOUBLE,
+    city STRING
+) USING HUDI TBLPROPERTIES (primaryKey = 'uuid',preCombineField = 'ts', 
hoodie.datasource.write.payload.class='org.apache.hudi.common.model.DefaultHoodieRecordPayload');
+
+INSERT INTO hudi_table
+VALUES
+(3,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
+(2,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-C','driver-M',27.70 
,'san_francisco');
+
+select * from hudi_table;
+-- Result - 20250203164444124  20250203164444124_0_0   
334e26e9-8355-45cc-97c6-c31daf0df330            
4549ed8e-0346-4d59-8878-9e047fb6c651-0_0-14-17_20250203164444124.parquet        
3       334e26e9-8355-45cc-97c6-c31daf0df330    rider-A driver-K        19.1    
san_francisco
+
+
+INSERT INTO hudi_table
+VALUES
+(1,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-D','driver-K',18.00,'san_francisco');
+
+select * from hudi_table;
+-- Result - 20250203164444124  20250203164444124_0_0   
334e26e9-8355-45cc-97c6-c31daf0df330            
4549ed8e-0346-4d59-8878-9e047fb6c651-0_0-53-51_20250203164537068.parquet        
3       334e26e9-8355-45cc-97c6-c31daf0df330    rider-A driver-K        19.1    
san_francisco
+```
+
+## Conclusion
+In conclusion, managing late-arriving and out-of-order data is a critical 
challenge in modern data processing systems, especially when dealing with 
large-scale, real-time data pipelines. Tools like Hudi provide powerful merge 
modes that ensure data consistency, accuracy, and efficiency by handling record 
updates intelligently across different stages of the pipeline. Whether you're 
working with streaming data, IoT sensors, or social media posts, understanding 
how to configure and use thes [...]

Reply via email to