This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a6301a807 [format] fix blob format reader may cause 
IndexOutOfBoundException (#7496)
1a6301a807 is described below

commit 1a6301a8074e7a5a2076a6c5f194e7c94888a525
Author: Faiz <[email protected]>
AuthorDate: Sat Mar 21 13:37:55 2026 +0800

    [format] fix blob format reader may cause IndexOutOfBoundException (#7496)
---
 .../org/apache/paimon/append/BlobTableTest.java    | 33 +++++++++++++++++
 .../org/apache/paimon/flink/BlobTableITCase.java   | 23 ++++++++++++
 .../apache/paimon/format/blob/BlobFileFormat.java  | 24 +++++++++++--
 .../paimon/format/blob/BlobFormatReader.java       | 15 ++++++--
 .../paimon/format/blob/BlobFileFormatTest.java     | 42 +++++++++++++++++++++-
 5 files changed, 131 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 1dfc333f05..736ce904cf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -38,6 +38,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.system.RowTrackingTable;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -479,6 +480,38 @@ public class BlobTableTest extends TableTestBase {
                                 + "'.");
     }
 
+    @Test
+    public void testReadRowTrackingWithBlobProjection() throws Exception {
+        createTableDefault();
+        writeDataDefault(
+                Collections.singletonList(
+                        GenericRow.of(
+                                1,
+                                
BinaryString.fromString("test_row_id_projection"),
+                                new BlobData(blobBytes))));
+
+        // read from RowTrackingTable which appends _ROW_ID and 
_SEQUENCE_NUMBER to the schema
+        FileStoreTable fileStoreTable = getTableDefault();
+        RowTrackingTable rowTrackingTable = new 
RowTrackingTable(fileStoreTable);
+
+        // read with projection: only _ROW_ID and f2 (blob)
+        // row tracking schema indices: 0=f0, 1=f1, 2=f2, 3=_ROW_ID, 
4=_SEQUENCE_NUMBER
+        ReadBuilder projectedBuilder =
+                rowTrackingTable.newReadBuilder().withProjection(new int[] {3, 
2});
+        RecordReader<InternalRow> projectedReader =
+                
projectedBuilder.newRead().createReader(projectedBuilder.newScan().plan());
+        AtomicInteger projectedCount = new AtomicInteger(0);
+        projectedReader.forEachRemaining(
+                row -> {
+                    projectedCount.incrementAndGet();
+                    // field 0 = _ROW_ID
+                    assertThat(row.isNullAt(0)).isFalse();
+                    // field 1 = f2 (blob)
+                    assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes);
+                });
+        assertThat(projectedCount.get()).isEqualTo(1);
+    }
+
     private void createExternalStorageTable() throws Exception {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f0", DataTypes.INT());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index a33b181158..29938f36be 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -276,6 +276,29 @@ public class BlobTableITCase extends CatalogITCaseBase {
         assertThat(blob.toData()).isEqualTo(new byte[] {72, 101, 108, 108, 
111});
     }
 
+    @Test
+    public void testRowTrackingWithBlobProjection() {
+        batchSql("INSERT INTO blob_table VALUES (1, 'paimon', X'48656C6C6F')");
+
+        // query _ROW_ID and blob field from row_tracking system table
+        // this previously caused ArrayIndexOutOfBoundsException because 
BlobFormatReader
+        // ignored the projectedRowType and always returned a single-field row
+        List<Row> result = batchSql("SELECT _ROW_ID, picture FROM 
blob_table$row_tracking");
+        assertThat(result).hasSize(1);
+        Row row = result.get(0);
+        assertThat(row.getField(0)).isNotNull();
+        assertThat((byte[]) row.getField(1)).isEqualTo(new byte[] {72, 101, 
108, 108, 111});
+
+        // also verify selecting only _ROW_ID works
+        List<Row> rowIdOnly = batchSql("SELECT _ROW_ID FROM 
blob_table$row_tracking");
+        assertThat(rowIdOnly).hasSize(1);
+        assertThat(rowIdOnly.get(0).getField(0)).isNotNull();
+
+        // verify selecting all columns from row_tracking works
+        List<Row> allColumns = batchSql("SELECT * FROM 
blob_table$row_tracking");
+        assertThat(allColumns).hasSize(1);
+    }
+
     @Test
     public void testBlobTypeSchemaEquals() throws Exception {
         // Step 1: Create a Paimon table with blob field via Flink SQL
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
index 255139e85f..c65f76eeaa 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -36,6 +36,7 @@ import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -74,7 +75,7 @@ public class BlobFileFormat extends FileFormat {
             RowType dataSchemaRowType,
             RowType projectedRowType,
             @Nullable List<Predicate> filters) {
-        return new BlobFormatReaderFactory(blobAsDescriptor);
+        return new BlobFormatReaderFactory(blobAsDescriptor, projectedRowType);
     }
 
     @Override
@@ -114,9 +115,16 @@ public class BlobFileFormat extends FileFormat {
     private static class BlobFormatReaderFactory implements 
FormatReaderFactory {
 
         private final boolean blobAsDescriptor;
+        private final int fieldCount;
+        private final int blobIndex;
 
-        public BlobFormatReaderFactory(boolean blobAsDescriptor) {
+        public BlobFormatReaderFactory(boolean blobAsDescriptor, RowType 
projectedRowType) {
             this.blobAsDescriptor = blobAsDescriptor;
+            this.fieldCount = projectedRowType.getFieldCount();
+            this.blobIndex = findBlobFieldIndex(projectedRowType);
+            Preconditions.checkState(
+                    this.blobIndex >= 0,
+                    "Read type of a blob format does not contain any blob 
field.");
         }
 
         @Override
@@ -134,7 +142,17 @@ public class BlobFileFormat extends FileFormat {
                     in = null;
                 }
             }
-            return new BlobFormatReader(fileIO, filePath, fileMeta, in);
+
+            return new BlobFormatReader(fileIO, filePath, fileMeta, in, 
fieldCount, blobIndex);
+        }
+
+        private static int findBlobFieldIndex(RowType rowType) {
+            for (int i = 0; i < rowType.getFieldCount(); i++) {
+                if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
+                    return i;
+                }
+            }
+            return -1;
         }
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index 2ba0169cdc..801964b862 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -40,16 +40,25 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
     private final String filePathString;
     private final BlobFileMeta fileMeta;
     private final @Nullable SeekableInputStream in;
+    private final int fieldCount;
+    private final int blobIndex;
 
     private boolean returned;
 
     public BlobFormatReader(
-            FileIO fileIO, Path filePath, BlobFileMeta fileMeta, @Nullable 
SeekableInputStream in) {
+            FileIO fileIO,
+            Path filePath,
+            BlobFileMeta fileMeta,
+            @Nullable SeekableInputStream in,
+            int fieldCount,
+            int blobIndex) {
         this.fileIO = fileIO;
         this.filePath = filePath;
         this.filePathString = filePath.toString();
         this.fileMeta = fileMeta;
         this.in = in;
+        this.fieldCount = fieldCount;
+        this.blobIndex = blobIndex;
         this.returned = false;
     }
 
@@ -95,7 +104,9 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
                     }
                 }
                 currentPosition++;
-                return GenericRow.of(blob);
+                GenericRow row = new GenericRow(fieldCount);
+                row.setField(blobIndex, blob);
+                return row;
             }
 
             @Override
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
index 0132359c01..0b66e15c2b 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.BlobData;
 import org.apache.paimon.data.BlobRef;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriter;
@@ -94,7 +95,7 @@ public class BlobFileFormatTest {
         }
 
         // read
-        FormatReaderFactory readerFactory = format.createReaderFactory(null, 
null, null);
+        FormatReaderFactory readerFactory = format.createReaderFactory(null, 
rowType, null);
         FormatReaderContext context =
                 new FormatReaderContext(fileIO, file, 
fileIO.getFileSize(file));
         List<byte[]> result = new ArrayList<>();
@@ -130,4 +131,43 @@ public class BlobFileFormatTest {
         // assert
         assertThat(result).containsOnly(blobs.get(2));
     }
+
+    @Test
+    public void testReadWithProjectedRowTypeContainingExtraFields() throws 
IOException {
+        BlobFileFormat format = new BlobFileFormat(false);
+        RowType writeRowType = RowType.of(DataTypes.BLOB());
+
+        // write blob data
+        List<byte[]> blobs = Arrays.asList("hello".getBytes(), 
"world".getBytes());
+        try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
+            FormatWriter formatWriter = 
format.createWriterFactory(writeRowType).create(out, null);
+            for (byte[] bytes : blobs) {
+                formatWriter.addElement(GenericRow.of(new BlobData(bytes)));
+            }
+            formatWriter.close();
+        }
+
+        // read with a projectedRowType that has extra fields (simulating 
_ROW_ID scenario)
+        // projectedRowType: <BIGINT, BLOB> — blob is at index 1
+        RowType projectedRowType = RowType.of(DataTypes.BIGINT(), 
DataTypes.BLOB());
+        FormatReaderFactory readerFactory =
+                format.createReaderFactory(null, projectedRowType, null);
+        FormatReaderContext context =
+                new FormatReaderContext(fileIO, file, 
fileIO.getFileSize(file));
+
+        List<InternalRow> rows = new ArrayList<>();
+        readerFactory.createReader(context).forEachRemaining(rows::add);
+
+        assertThat(rows).hasSize(2);
+        for (InternalRow row : rows) {
+            // row should have 2 fields
+            assertThat(row.getFieldCount()).isEqualTo(2);
+            // field 0 (BIGINT) should be null (default value)
+            assertThat(row.isNullAt(0)).isTrue();
+            // field 1 (BLOB) should contain data
+            assertThat(row.isNullAt(1)).isFalse();
+        }
+        
assertThat(rows.get(0).getBlob(1).toData()).isEqualTo("hello".getBytes());
+        
assertThat(rows.get(1).getBlob(1).toData()).isEqualTo("world".getBytes());
+    }
 }

Reply via email to