SatyajitMahunta opened a new issue, #13478:
URL: https://github.com/apache/hudi/issues/13478

   **CRITICAL ISSUE**: `ArrayIndexOutOfBoundsException: Index -1 out of bounds 
for length X` when running Flink batch job with schema evolution on existing 
Hudi MERGE_ON_READ table (changing from X columns to X+1 columns).
   
   **Root Cause**: Error occurs in 
`ParquetSplitReaderUtil.genPartColumnarRowReader()` during UPSERT operation 
when Hudi's internal read process attempts to map X+1 columns (new schema) to 
existing Parquet files with X columns. The missing column returns index -1, 
causing array access exception.
   
   **Production Impact**: This blocks adding new tracking fields to our data 
pipeline processing millions of records. We cannot delete existing historical 
data due to production constraints.
   
   **When Error Occurs**: During the UPSERT operation (not table creation) when 
Hudi internally reads existing data to merge with new records.
   
   **Related**: This appears to be the same issue as 
[HUDI-6103](https://issues.apache.org/jira/browse/HUDI-6103) in Apache Hudi 
JIRA.
   
   **Note**: This issue occurs when adding ANY new field to an existing 
MERGE_ON_READ table. The example below uses 5 columns → 6 columns, but the same 
error occurs with any column count (e.g., 104 → 105 columns in our production 
case).
   
   **To Reproduce**
   
   **Scenario**: Flink batch jobs with automatic table lifecycle management
   
   **Table Configuration**:
   - **Table Type**: MERGE_ON_READ
   - **Index Type**: BUCKET (`hoodie.bucket.index.num.buckets=10`)
   - **Partitioning**: yr
   - **Primary Keys**: id1, id2
   
   **Steps to reproduce the behavior:**
   
   1. **Run Flink Batch Job 1 with 5 columns** and write data using 
**bulk_insert**:
   
   ```sql
   -- Flink Job 1: Creates table and writes data using bulk_insert
   CREATE TABLE test_table (
       id1 STRING,
       id2 STRING,
       field1 STRING,
       field2 STRING,
       field3 STRING,
       yr INT,
       PRIMARY KEY (id1, id2) NOT ENFORCED
   ) PARTITIONED BY (yr) WITH (
       'connector' = 'hudi',
       'path' = 'file:///tmp/hudi-test/test-table/',
       'hoodie.database.name' = 'test_db',
       'hoodie.table.name' = 'test_table',
       'write.operation' = 'bulk_insert',
       'table.type' = 'MERGE_ON_READ',
       'index.type' = 'BUCKET',
       'hoodie.bucket.index.num.buckets' = '10',
       'hoodie.index.bucket.engine' = 'SIMPLE',
       'hoodie.clean.automatic' = 'true',
       'hoodie.cleaner.parallelism' = '200',
       'clean.policy' = 'KEEP_LATEST_FILE_VERSIONS',
       'clean.async.enabled' = 'true',
       'clean.retain_commits' = '2',
       'hoodie.datasource.write.hive_style_partitioning' = 'true',
       'hoodie.parquet.compression.codec' = 'snappy',
       'write.merge.max_memory' = '2048',
       'write.task.max.size' = '4096',
       'hoodie.memory.merge.max.size' = '2004857600000',
       'compaction.max_memory' = '3000',
       'hoodie.parquet.small.file.limit' = '104857600',
       'hoodie.write.set.null.for.missing.columns' = 'true',
       'hoodie.archive.automatic' = 'true',
       'hoodie.archive.async' = 'true',
       'hoodie.schema.on.read.enable' = 'true',
       'metadata.enabled' = 'true',
       'compaction.async.enabled' = 'false',
       'compaction.delta_commits' = '1',
       'hoodie.compaction.logfile.num.threshold' = '0',
       'compaction.schedule.enabled' = 'true',
       'compaction.trigger.strategy' = 'num_commits',
       'hoodie.compaction.strategy' = 
'org.apache.hudi.table.action.compact.strategy.LogFileNumBasedCompactionStrategy'
   );
   
   -- Insert data using bulk_insert operation
   INSERT INTO test_table VALUES 
   ('key1', 'key2', 'value1', 'value2', 'value3', 2025);
   
   -- Job completes successfully, Flink table is automatically cleaned up
   -- Hudi data remains on disk with 5-column schema
   ```
   
   2. **Run Flink Batch Job 2 with 6 columns** (adding new_field) using 
**upsert**:
   
   ```sql
   -- Flink Job 2: Creates table with additional column using upsert operation
   CREATE TABLE test_table (
       id1 STRING,
       id2 STRING,
       field1 STRING,
       field2 STRING,
       field3 STRING,
       new_field STRING,  -- NEW COLUMN (6th column)
       yr INT,
       PRIMARY KEY (id1, id2) NOT ENFORCED
   ) PARTITIONED BY (yr) WITH (
       'connector' = 'hudi',
       'path' = 'file:///tmp/hudi-test/test-table/',  -- SAME PATH as Job 1
       'hoodie.database.name' = 'test_db',
       'hoodie.table.name' = 'test_table',
       'write.operation' = 'upsert',  -- CHANGED FROM bulk_insert TO upsert
       'table.type' = 'MERGE_ON_READ',
       'index.type' = 'BUCKET',
       'hoodie.bucket.index.num.buckets' = '10',
       'hoodie.index.bucket.engine' = 'SIMPLE',
       'hoodie.clean.automatic' = 'true',
       'hoodie.cleaner.parallelism' = '200',
       'clean.policy' = 'KEEP_LATEST_FILE_VERSIONS',
       'clean.async.enabled' = 'true',
       'clean.retain_commits' = '2',
       'hoodie.datasource.write.hive_style_partitioning' = 'true',
       'hoodie.parquet.compression.codec' = 'snappy',
       'write.merge.max_memory' = '2048',
       'write.task.max.size' = '4096',
       'hoodie.memory.merge.max.size' = '2004857600000',
       'compaction.max_memory' = '3000',
       'hoodie.parquet.small.file.limit' = '104857600',
       'hoodie.write.set.null.for.missing.columns' = 'true',
       'hoodie.archive.automatic' = 'true',
       'hoodie.archive.async' = 'true',
       'hoodie.schema.on.read.enable' = 'true',
       'metadata.enabled' = 'true',
       'compaction.async.enabled' = 'false',
       'compaction.delta_commits' = '1',
       'hoodie.compaction.logfile.num.threshold' = '0',
       'compaction.schedule.enabled' = 'true',
       'compaction.trigger.strategy' = 'num_commits',
       'hoodie.compaction.strategy' = 
'org.apache.hudi.table.action.compact.strategy.LogFileNumBasedCompactionStrategy'
   );
   
   -- This INSERT causes the error during UPSERT operation
   -- Error occurs when Hudi tries to read existing 5-column data for upsert 
processing
   INSERT INTO test_table VALUES 
   ('key1', 'key2', 'new_value1', 'new_value2', 'new_value3', 
'new_tracking_value', 2025);
   ```
   
   3. **Error occurs** during the INSERT operation when the **upsert 
operation** requires Hudi to read existing 5-column Parquet files with the new 
6-column schema for merge processing.
   
   **Key Difference**: The error specifically occurs because:
   - **Job 1**: Uses `bulk_insert` (no read required, direct write)
   - **Job 2**: Uses `upsert` (requires reading existing data to check for 
duplicates and merge)
   
   **Expected behavior**
   
   The UPSERT operation should successfully handle schema evolution by:
   1. Reading existing 5-column data during upsert processing
   2. Adding NULL values for the missing `new_field` column
   3. Merging with new 6-column data based on primary keys
   4. Writing updated records without throwing ArrayIndexOutOfBoundsException
   
   **Environment Description**
   
   * Hudi version : 0.14.1
   * Flink version : 1.17.2
   * Spark version : N/A (using Flink)
   * Hive version : N/A
   * Hadoop version : 3.3.6
   * Java version: 11.0.22
   
   **Additional context**
   
   **Configurations Applied** (all unsuccessful):
   ```yaml
   'hoodie.schema.on.read.enable' = 'true'
   'hoodie.write.set.null.for.missing.columns' = 'true'
   'hoodie.datasource.write.schema.allow.auto.evolution' = 'true'
   'hoodie.parquet.field.id.write.enabled' = 'true'
   'hoodie.avro.schema.validate' = 'false'
   'hoodie.schema.compatibility.check.enabled' = 'false'
   ```
   
   
   **Production Context**: This is blocking our ability to add new tracking 
fields to a high-volume data pipeline. 
   
   **Community References**:
   - Related JIRA: [HUDI-6103](https://issues.apache.org/jira/browse/HUDI-6103)
   
   **Stacktrace**
   
   ```
   java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 5
        at 
org.apache.hudi.io.storage.row.parquet.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:89)
        at 
org.apache.hudi.io.storage.row.parquet.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:69)
        at 
org.apache.hudi.common.table.read.HoodieFileGroupReader.getBaseFileReader(HoodieFileGroupReader.java:226)
        at 
org.apache.hudi.common.table.read.HoodieFileGroupReader.<init>(HoodieFileGroupReader.java:104)
        at 
org.apache.hudi.common.table.read.HoodieFileGroupReader.<init>(HoodieFileGroupReader.java:89)
        at 
org.apache.hudi.table.format.mor.MergeOnReadInputSplit.createReader(MergeOnReadInputSplit.java:105)
        at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.createReader(MergeOnReadInputFormat.java:104)
        at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.createReader(MergeOnReadInputFormat.java:42)
        at 
org.apache.flink.connector.file.src.reader.BulkFormat$Reader.readBatch(BulkFormat.java:289)
        at 
org.apache.flink.connector.file.src.reader.FileRecordFormat.readBatch(FileRecordFormat.java:79)
        at 
org.apache.flink.connector.file.src.reader.SourceReaderBase.readRecords(SourceReaderBase.java:132)
        at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
        at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   
   ---
   
   Any insights from the community would be greatly appreciated! We're happy to 
provide additional debugging information, logs, or test cases as needed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to