This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1a6301a807 [format] fix blob format reader may cause
IndexOutOfBoundException (#7496)
1a6301a807 is described below
commit 1a6301a8074e7a5a2076a6c5f194e7c94888a525
Author: Faiz <[email protected]>
AuthorDate: Sat Mar 21 13:37:55 2026 +0800
[format] fix blob format reader may cause IndexOutOfBoundException (#7496)
---
.../org/apache/paimon/append/BlobTableTest.java | 33 +++++++++++++++++
.../org/apache/paimon/flink/BlobTableITCase.java | 23 ++++++++++++
.../apache/paimon/format/blob/BlobFileFormat.java | 24 +++++++++++--
.../paimon/format/blob/BlobFormatReader.java | 15 ++++++--
.../paimon/format/blob/BlobFileFormatTest.java | 42 +++++++++++++++++++++-
5 files changed, 131 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 1dfc333f05..736ce904cf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -38,6 +38,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.system.RowTrackingTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -479,6 +480,38 @@ public class BlobTableTest extends TableTestBase {
+ "'.");
}
+ @Test
+ public void testReadRowTrackingWithBlobProjection() throws Exception {
+ createTableDefault();
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 1,
+
BinaryString.fromString("test_row_id_projection"),
+ new BlobData(blobBytes))));
+
+ // read from RowTrackingTable which appends _ROW_ID and
_SEQUENCE_NUMBER to the schema
+ FileStoreTable fileStoreTable = getTableDefault();
+ RowTrackingTable rowTrackingTable = new
RowTrackingTable(fileStoreTable);
+
+ // read with projection: only _ROW_ID and f2 (blob)
+ // row tracking schema indices: 0=f0, 1=f1, 2=f2, 3=_ROW_ID,
4=_SEQUENCE_NUMBER
+ ReadBuilder projectedBuilder =
+ rowTrackingTable.newReadBuilder().withProjection(new int[] {3,
2});
+ RecordReader<InternalRow> projectedReader =
+
projectedBuilder.newRead().createReader(projectedBuilder.newScan().plan());
+ AtomicInteger projectedCount = new AtomicInteger(0);
+ projectedReader.forEachRemaining(
+ row -> {
+ projectedCount.incrementAndGet();
+ // field 0 = _ROW_ID
+ assertThat(row.isNullAt(0)).isFalse();
+ // field 1 = f2 (blob)
+ assertThat(row.getBlob(1).toData()).isEqualTo(blobBytes);
+ });
+ assertThat(projectedCount.get()).isEqualTo(1);
+ }
+
private void createExternalStorageTable() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index a33b181158..29938f36be 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -276,6 +276,29 @@ public class BlobTableITCase extends CatalogITCaseBase {
assertThat(blob.toData()).isEqualTo(new byte[] {72, 101, 108, 108,
111});
}
+ @Test
+ public void testRowTrackingWithBlobProjection() {
+ batchSql("INSERT INTO blob_table VALUES (1, 'paimon', X'48656C6C6F')");
+
+ // query _ROW_ID and blob field from row_tracking system table
+ // this previously caused ArrayIndexOutOfBoundsException because
BlobFormatReader
+ // ignored the projectedRowType and always returned a single-field row
+ List<Row> result = batchSql("SELECT _ROW_ID, picture FROM
blob_table$row_tracking");
+ assertThat(result).hasSize(1);
+ Row row = result.get(0);
+ assertThat(row.getField(0)).isNotNull();
+ assertThat((byte[]) row.getField(1)).isEqualTo(new byte[] {72, 101,
108, 108, 111});
+
+ // also verify selecting only _ROW_ID works
+ List<Row> rowIdOnly = batchSql("SELECT _ROW_ID FROM
blob_table$row_tracking");
+ assertThat(rowIdOnly).hasSize(1);
+ assertThat(rowIdOnly.get(0).getField(0)).isNotNull();
+
+ // verify selecting all columns from row_tracking works
+ List<Row> allColumns = batchSql("SELECT * FROM
blob_table$row_tracking");
+ assertThat(allColumns).hasSize(1);
+ }
+
@Test
public void testBlobTypeSchemaEquals() throws Exception {
// Step 1: Create a Paimon table with blob field via Flink SQL
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
index 255139e85f..c65f76eeaa 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -36,6 +36,7 @@ import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
@@ -74,7 +75,7 @@ public class BlobFileFormat extends FileFormat {
RowType dataSchemaRowType,
RowType projectedRowType,
@Nullable List<Predicate> filters) {
- return new BlobFormatReaderFactory(blobAsDescriptor);
+ return new BlobFormatReaderFactory(blobAsDescriptor, projectedRowType);
}
@Override
@@ -114,9 +115,16 @@ public class BlobFileFormat extends FileFormat {
private static class BlobFormatReaderFactory implements
FormatReaderFactory {
private final boolean blobAsDescriptor;
+ private final int fieldCount;
+ private final int blobIndex;
- public BlobFormatReaderFactory(boolean blobAsDescriptor) {
+ public BlobFormatReaderFactory(boolean blobAsDescriptor, RowType
projectedRowType) {
this.blobAsDescriptor = blobAsDescriptor;
+ this.fieldCount = projectedRowType.getFieldCount();
+ this.blobIndex = findBlobFieldIndex(projectedRowType);
+ Preconditions.checkState(
+ this.blobIndex >= 0,
+ "Read type of a blob format does not contain any blob
field.");
}
@Override
@@ -134,7 +142,17 @@ public class BlobFileFormat extends FileFormat {
in = null;
}
}
- return new BlobFormatReader(fileIO, filePath, fileMeta, in);
+
+ return new BlobFormatReader(fileIO, filePath, fileMeta, in,
fieldCount, blobIndex);
+ }
+
+ private static int findBlobFieldIndex(RowType rowType) {
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
+ return i;
+ }
+ }
+ return -1;
}
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index 2ba0169cdc..801964b862 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -40,16 +40,25 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
private final String filePathString;
private final BlobFileMeta fileMeta;
private final @Nullable SeekableInputStream in;
+ private final int fieldCount;
+ private final int blobIndex;
private boolean returned;
public BlobFormatReader(
- FileIO fileIO, Path filePath, BlobFileMeta fileMeta, @Nullable
SeekableInputStream in) {
+ FileIO fileIO,
+ Path filePath,
+ BlobFileMeta fileMeta,
+ @Nullable SeekableInputStream in,
+ int fieldCount,
+ int blobIndex) {
this.fileIO = fileIO;
this.filePath = filePath;
this.filePathString = filePath.toString();
this.fileMeta = fileMeta;
this.in = in;
+ this.fieldCount = fieldCount;
+ this.blobIndex = blobIndex;
this.returned = false;
}
@@ -95,7 +104,9 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
}
}
currentPosition++;
- return GenericRow.of(blob);
+ GenericRow row = new GenericRow(fieldCount);
+ row.setField(blobIndex, blob);
+ return row;
}
@Override
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
index 0132359c01..0b66e15c2b 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.BlobRef;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriter;
@@ -94,7 +95,7 @@ public class BlobFileFormatTest {
}
// read
- FormatReaderFactory readerFactory = format.createReaderFactory(null,
null, null);
+ FormatReaderFactory readerFactory = format.createReaderFactory(null,
rowType, null);
FormatReaderContext context =
new FormatReaderContext(fileIO, file,
fileIO.getFileSize(file));
List<byte[]> result = new ArrayList<>();
@@ -130,4 +131,43 @@ public class BlobFileFormatTest {
// assert
assertThat(result).containsOnly(blobs.get(2));
}
+
+ @Test
+ public void testReadWithProjectedRowTypeContainingExtraFields() throws
IOException {
+ BlobFileFormat format = new BlobFileFormat(false);
+ RowType writeRowType = RowType.of(DataTypes.BLOB());
+
+ // write blob data
+ List<byte[]> blobs = Arrays.asList("hello".getBytes(),
"world".getBytes());
+ try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
+ FormatWriter formatWriter =
format.createWriterFactory(writeRowType).create(out, null);
+ for (byte[] bytes : blobs) {
+ formatWriter.addElement(GenericRow.of(new BlobData(bytes)));
+ }
+ formatWriter.close();
+ }
+
+ // read with a projectedRowType that has extra fields (simulating
_ROW_ID scenario)
+ // projectedRowType: <BIGINT, BLOB> — blob is at index 1
+ RowType projectedRowType = RowType.of(DataTypes.BIGINT(),
DataTypes.BLOB());
+ FormatReaderFactory readerFactory =
+ format.createReaderFactory(null, projectedRowType, null);
+ FormatReaderContext context =
+ new FormatReaderContext(fileIO, file,
fileIO.getFileSize(file));
+
+ List<InternalRow> rows = new ArrayList<>();
+ readerFactory.createReader(context).forEachRemaining(rows::add);
+
+ assertThat(rows).hasSize(2);
+ for (InternalRow row : rows) {
+ // row should have 2 fields
+ assertThat(row.getFieldCount()).isEqualTo(2);
+ // field 0 (BIGINT) should be null (default value)
+ assertThat(row.isNullAt(0)).isTrue();
+ // field 1 (BLOB) should contain data
+ assertThat(row.isNullAt(1)).isFalse();
+ }
+
assertThat(rows.get(0).getBlob(1).toData()).isEqualTo("hello".getBytes());
+
assertThat(rows.get(1).getBlob(1).toData()).isEqualTo("world".getBytes());
+ }
}