This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 72600f9bb5 [core] Fix blob read failure after alter table and
compaction (#7618)
72600f9bb5 is described below
commit 72600f9bb5edaeb7845c001b228de25caba6a7a8
Author: littlecoder04 <[email protected]>
AuthorDate: Fri Apr 10 16:18:23 2026 +0800
[core] Fix blob read failure after alter table and compaction (#7618)
Reading a blob table fails after ALTER TABLE SET and compaction with:
`java.lang.IllegalArgumentException: All files in this bunch should have
the same schema id.` This PR fixes the above issue by do not check
schemaId for blob file and do not allow rename for blob col.
---
.../paimon/operation/DataEvolutionSplitRead.java | 8 +-
.../org/apache/paimon/schema/SchemaManager.java | 15 ++
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 80 +++++++
.../org/apache/paimon/append/BlobTableTest.java | 239 +++++++++++++++++++++
.../paimon/operation/DataEvolutionReadTest.java | 44 ++++
paimon-python/dev/run_mixed_tests.sh | 39 +++-
paimon-python/pypaimon/read/reader/field_bunch.py | 9 +-
paimon-python/pypaimon/schema/schema_manager.py | 13 ++
paimon-python/pypaimon/tests/blob_table_test.py | 25 +++
.../pypaimon/tests/e2e/java_py_read_write_test.py | 9 +
10 files changed, 473 insertions(+), 8 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index dd8433b1b4..c0e70ff4f8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -548,9 +548,11 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
}
}
if (!files.isEmpty()) {
- checkArgument(
- file.schemaId() == files.get(0).schemaId(),
- "All files in this bunch should have the same
schema id.");
+ if (!isBlobFile(file.fileName())) {
+ checkArgument(
+ file.schemaId() == files.get(0).schemaId(),
+ "All files in this bunch should have the same
schema id.");
+ }
checkArgument(
file.writeCols().equals(files.get(0).writeCols()),
"All files in this bunch should have the same
write columns.");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 549c0e96a8..670960889d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -42,6 +42,7 @@ import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeCasts;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.ReassignFieldId;
import org.apache.paimon.types.RowType;
@@ -404,6 +405,7 @@ public class SchemaManager implements Serializable {
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
assertNotUpdatingPartitionKeys(oldTableSchema,
rename.fieldNames(), "rename");
+ assertNotRenamingBlobColumn(newFields, rename.fieldNames());
new NestedColumnModifier(rename.fieldNames(), lazyIdentifier) {
@Override
protected void updateLastColumn(
@@ -908,6 +910,19 @@ public class SchemaManager implements Serializable {
}
}
+ private static void assertNotRenamingBlobColumn(List<DataField> fields,
String[] fieldNames) {
+ if (fieldNames.length > 1) {
+ return;
+ }
+ String fieldName = fieldNames[0];
+ for (DataField field : fields) {
+ if (field.name().equals(fieldName) &&
field.type().is(DataTypeRoot.BLOB)) {
+ throw new UnsupportedOperationException(
+ String.format("Cannot rename BLOB column: [%s]",
fieldName));
+ }
+ }
+ }
+
private abstract static class NestedColumnModifier {
private final String[] updateFieldNames;
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index ca640aa68d..bd1e006ec7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -791,6 +791,86 @@ public class JavaPyE2ETest {
}
}
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testBlobWriteAlterCompact() throws Exception {
+ Identifier identifier = identifier("blob_alter_compact_test");
+ catalog.dropTable(identifier, true);
+ Schema schema =
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT())
+ .column("f1", DataTypes.STRING())
+ .column("f2", DataTypes.BLOB())
+ .option("target-file-size", "100 MB")
+ .option("row-tracking.enabled", "true")
+ .option("data-evolution.enabled", "true")
+ .option("compaction.min.file-num", "2")
+ .option("bucket", "-1")
+ .build();
+ catalog.createTable(identifier, schema, false);
+
+ byte[] blobBytes = new byte[1024];
+ new java.util.Random(42).nextBytes(blobBytes);
+
+ // Batch 1: write with schemaId=0
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ StreamTableWrite write =
+
table.newStreamWriteBuilder().withCommitUser(commitUser).newWrite();
+ StreamTableCommit commit = table.newCommit(commitUser);
+ for (int i = 0; i < 100; i++) {
+ write.write(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("batch1"),
+ new org.apache.paimon.data.BlobData(blobBytes)));
+ }
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // ALTER TABLE SET -> schemaId becomes 1
+ catalog.alterTable(
+ identifier,
+
org.apache.paimon.schema.SchemaChange.setOption("snapshot.num-retained.min",
"5"),
+ false);
+
+ // Batch 2: write with schemaId=1
+ table = (FileStoreTable) catalog.getTable(identifier);
+ write =
table.newStreamWriteBuilder().withCommitUser(commitUser).newWrite();
+ commit = table.newCommit(commitUser);
+ for (int i = 0; i < 100; i++) {
+ write.write(
+ GenericRow.of(
+ 2,
+ BinaryString.fromString("batch2"),
+ new org.apache.paimon.data.BlobData(blobBytes)));
+ }
+ commit.commit(1, write.prepareCommit(false, 1));
+ write.close();
+ commit.close();
+
+ // Compact
+ table = (FileStoreTable) catalog.getTable(identifier);
+ org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator
coordinator =
+ new
org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator(
+ table, false, false);
+ List<org.apache.paimon.append.dataevolution.DataEvolutionCompactTask>
tasks =
+ coordinator.plan();
+ assertThat(tasks.size()).isGreaterThan(0);
+ List<org.apache.paimon.table.sink.CommitMessage> compactMessages = new
ArrayList<>();
+ for (org.apache.paimon.append.dataevolution.DataEvolutionCompactTask
task : tasks) {
+ compactMessages.add(task.doCompact(table, commitUser));
+ }
+ StreamTableCommit compactCommit = table.newCommit(commitUser);
+ compactCommit.commit(2, compactMessages);
+ compactCommit.close();
+
+ FileStoreTable readTable = (FileStoreTable)
catalog.getTable(identifier);
+ List<Split> splits = new
ArrayList<>(readTable.newSnapshotReader().read().dataSplits());
+ TableRead read = readTable.newRead();
+ RowType rowType = readTable.rowType();
+ List<String> res = getResult(read, splits, row ->
internalRowToString(row, rowType));
+ assertThat(res).hasSize(200);
+ }
+
// Helper method from TableTestBase
protected Identifier identifier(String tableName) {
return new Identifier(database, tableName);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 736ce904cf..e3019485bb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -19,6 +19,8 @@
package org.apache.paimon.append;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
@@ -33,10 +35,14 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.DataEvolutionSplitRead;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.system.RowTrackingTable;
import org.apache.paimon.types.DataField;
@@ -45,6 +51,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.UriReader;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -512,6 +519,238 @@ public class BlobTableTest extends TableTestBase {
assertThat(projectedCount.get()).isEqualTo(1);
}
+ @Test
+ void testReadBlobAfterAlterTableAndCompaction() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+ // Step 1: write data with schemaId=0
+ commitDefault(writeDataDefault(100, 1));
+
+ // Step 2: ALTER TABLE SET an unrelated option -> schemaId becomes 1
+ catalog.alterTable(
+ identifier(),
SchemaChange.setOption("snapshot.num-retained.min", "5"), false);
+
+ // Step 3: write more data with schemaId=1
+ commitDefault(writeDataDefault(100, 1));
+
+ // Step 4: compact blob table using DataEvolutionCompactCoordinator
+ FileStoreTable table = getTableDefault();
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, false, false);
+ List<DataEvolutionCompactTask> tasks = coordinator.plan();
+ assertThat(tasks.size()).isGreaterThan(0);
+ List<CommitMessage> compactMessages = new ArrayList<>();
+ for (DataEvolutionCompactTask task : tasks) {
+ compactMessages.add(task.doCompact(table, commitUser));
+ }
+ commitDefault(compactMessages);
+
+ // Step 5: read after compaction
+ readDefault(row ->
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes));
+ }
+
+ @Test
+ void testReadBlobAfterAddColumnAndCompaction() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+ {
+ FileStoreTable t = getTableDefault();
+ StreamWriteBuilder b =
t.newStreamWriteBuilder().withCommitUser(commitUser);
+ try (StreamTableWrite w = b.newWrite()) {
+ for (int j = 0; j < 100; j++) {
+ w.write(
+ GenericRow.of(
+ 1, BinaryString.fromString("batch1"), new
BlobData(blobBytes)));
+ }
+ commitDefault(w.prepareCommit(false, Long.MAX_VALUE));
+ }
+ }
+
+ catalog.alterTable(identifier(), SchemaChange.addColumn("f3",
DataTypes.STRING()), false);
+
+ {
+ FileStoreTable t = getTableDefault();
+ StreamWriteBuilder b =
t.newStreamWriteBuilder().withCommitUser(commitUser);
+ try (StreamTableWrite w = b.newWrite()) {
+ for (int j = 0; j < 100; j++) {
+ w.write(
+ GenericRow.of(
+ 2,
+ BinaryString.fromString("batch2"),
+ new BlobData(blobBytes),
+ BinaryString.fromString("after-add")));
+ }
+ commitDefault(w.prepareCommit(false, Long.MAX_VALUE));
+ }
+ }
+
+ FileStoreTable table = getTableDefault();
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, false, false);
+ List<DataEvolutionCompactTask> tasks = coordinator.plan();
+ assertThat(tasks.size()).isGreaterThan(0);
+ List<CommitMessage> compactMessages = new ArrayList<>();
+ for (DataEvolutionCompactTask task : tasks) {
+ compactMessages.add(task.doCompact(table, commitUser));
+ }
+ commitDefault(compactMessages);
+
+ AtomicInteger batch1Count = new AtomicInteger(0);
+ AtomicInteger batch2Count = new AtomicInteger(0);
+ readDefault(
+ row -> {
+ assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+ if (row.getInt(0) == 1) {
+ batch1Count.incrementAndGet();
+ } else if (row.getInt(0) == 2) {
+ batch2Count.incrementAndGet();
+ }
+ });
+ assertThat(batch1Count.get()).isEqualTo(100);
+ assertThat(batch2Count.get()).isEqualTo(100);
+ }
+
+ @Test
+ void testReadBlobAfterDropColumnAndCompaction() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.column("f3", DataTypes.STRING());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+ {
+ FileStoreTable t = getTableDefault();
+ StreamWriteBuilder b =
t.newStreamWriteBuilder().withCommitUser(commitUser);
+ try (StreamTableWrite w = b.newWrite()) {
+ for (int j = 0; j < 100; j++) {
+ w.write(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("batch1"),
+ new BlobData(blobBytes),
+ BinaryString.fromString("before-drop")));
+ }
+ commitDefault(w.prepareCommit(false, Long.MAX_VALUE));
+ }
+ }
+
+ catalog.alterTable(identifier(), SchemaChange.dropColumn("f3"), false);
+
+ {
+ FileStoreTable t = getTableDefault();
+ StreamWriteBuilder b =
t.newStreamWriteBuilder().withCommitUser(commitUser);
+ try (StreamTableWrite w = b.newWrite()) {
+ for (int j = 0; j < 100; j++) {
+ w.write(
+ GenericRow.of(
+ 2, BinaryString.fromString("batch2"), new
BlobData(blobBytes)));
+ }
+ commitDefault(w.prepareCommit(false, Long.MAX_VALUE));
+ }
+ }
+
+ FileStoreTable table = getTableDefault();
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, false, false);
+ List<DataEvolutionCompactTask> tasks = coordinator.plan();
+ assertThat(tasks.size()).isGreaterThan(0);
+ List<CommitMessage> compactMessages = new ArrayList<>();
+ for (DataEvolutionCompactTask task : tasks) {
+ compactMessages.add(task.doCompact(table, commitUser));
+ }
+ commitDefault(compactMessages);
+
+ AtomicInteger batch1Count = new AtomicInteger(0);
+ AtomicInteger batch2Count = new AtomicInteger(0);
+ readDefault(
+ row -> {
+ assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+ if (row.getInt(0) == 1) {
+ batch1Count.incrementAndGet();
+ } else if (row.getInt(0) == 2) {
+ batch2Count.incrementAndGet();
+ }
+ });
+ assertThat(batch1Count.get()).isEqualTo(100);
+ assertThat(batch2Count.get()).isEqualTo(100);
+ }
+
+ @Disabled("Reproduce: rename blob column causes read failure after
compaction")
+ @Test
+ void testRenameBlobColumnReadFailure() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+ // Step 1: write blob data — blob files record writeCols=["f2"]
+ commitDefault(writeDataDefault(100, 1));
+
+ // Step 2: rename blob column f2 -> f2_renamed
+ catalog.alterTable(identifier(), SchemaChange.renameColumn("f2",
"f2_renamed"), false);
+
+ // Step 3: write more data — new blob files have
writeCols=["f2_renamed"]
+ commitDefault(writeDataDefault(100, 1));
+
+ // Step 4: compact merges files into the same split
+ FileStoreTable table = getTableDefault();
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, false, false);
+ List<DataEvolutionCompactTask> tasks = coordinator.plan();
+ assertThat(tasks.size()).isGreaterThan(0);
+ List<CommitMessage> compactMessages = new ArrayList<>();
+ for (DataEvolutionCompactTask task : tasks) {
+ compactMessages.add(task.doCompact(table, commitUser));
+ }
+ commitDefault(compactMessages);
+
+ // Step 5: read fails —
+ assertThatThrownBy(() -> readDefault(row -> {}))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("All files in this bunch should have the
same write columns");
+ }
+
+ @Test
+ void testRenameBlobColumnShouldFail() throws Exception {
+ createTableDefault();
+ commitDefault(writeDataDefault(10, 1));
+
+ assertThatThrownBy(
+ () ->
+ catalog.alterTable(
+ identifier(),
+ SchemaChange.renameColumn("f2",
"f2_renamed"),
+ false))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Cannot rename BLOB column");
+ }
+
private void createExternalStorageTable() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
index 4e85b1e3dd..9bb9a7274e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -297,6 +297,36 @@ public class DataEvolutionReadTest {
fileName, firstRowId, rowCount, maxSequenceNumber,
Arrays.asList("blob_col"));
}
+ /** Creates a blob file with a specified schemaId. */
+ private DataFileMeta createBlobFileWithSchema(
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long maxSequenceNumber,
+ long schemaId) {
+ return DataFileMeta.create(
+ fileName + ".blob",
+ rowCount,
+ rowCount,
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0,
+ maxSequenceNumber,
+ schemaId,
+ DataFileMeta.DUMMY_LEVEL,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ rowCount,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ Arrays.asList("blob_col"));
+ }
+
/** Creates a blob file with custom write columns. */
private DataFileMeta createBlobFileWithCols(
String fileName,
@@ -327,6 +357,20 @@ public class DataEvolutionReadTest {
writeCols);
}
+ @Test
+ void testAddBlobFilesWithDifferentSchemaId() {
+ DataFileMeta blobEntry1 = createBlobFileWithSchema("blob1", 0, 100, 1,
0L);
+ DataFileMeta blobEntry2 = createBlobFileWithSchema("blob2", 100, 200,
1, 1L);
+
+ blobBunch.add(blobEntry1);
+ assertThatCode(() ->
blobBunch.add(blobEntry2)).doesNotThrowAnyException();
+
+ assertThat(blobBunch.files).hasSize(2);
+ assertThat(blobBunch.files.get(0).schemaId()).isEqualTo(0L);
+ assertThat(blobBunch.files.get(1).schemaId()).isEqualTo(1L);
+ assertThat(blobBunch.rowCount()).isEqualTo(300);
+ }
+
@Test
public void testRowIdPushDown() {
SpecialFieldBunch blobBunch = new SpecialFieldBunch(Long.MAX_VALUE,
true);
diff --git a/paimon-python/dev/run_mixed_tests.sh
b/paimon-python/dev/run_mixed_tests.sh
index f98bf32747..3366a5ff1c 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -290,6 +290,29 @@ run_lumina_vector_test() {
fi
}
+run_blob_alter_compact_test() {
+ echo -e "${YELLOW}=== Running Blob Alter+Compact Test (Java
Write+Alter+Compact, Python Read) ===${NC}"
+
+ cd "$PROJECT_ROOT"
+
+ echo "Running Maven test for JavaPyE2ETest.testBlobWriteAlterCompact..."
+ if mvn test
-Dtest=org.apache.paimon.JavaPyE2ETest#testBlobWriteAlterCompact -pl
paimon-core -q -Drun.e2e.tests=true; then
+ echo -e "${GREEN}✓ Java blob write+alter+compact test completed
successfully${NC}"
+ else
+ echo -e "${RED}✗ Java blob write+alter+compact test failed${NC}"
+ return 1
+ fi
+ cd "$PAIMON_PYTHON_DIR"
+ echo "Running Python test for
JavaPyReadWriteTest.test_read_blob_after_alter_and_compact..."
+ if python -m pytest
java_py_read_write_test.py::JavaPyReadWriteTest::test_read_blob_after_alter_and_compact
-v; then
+ echo -e "${GREEN}✓ Python blob read test completed successfully${NC}"
+ return 0
+ else
+ echo -e "${RED}✗ Python blob read test failed${NC}"
+ return 1
+ fi
+}
+
# Main execution
main() {
local java_write_result=0
@@ -301,6 +324,7 @@ main() {
local compressed_text_result=0
local tantivy_fulltext_result=0
local lumina_vector_result=0
+ local blob_alter_compact_result=0
# Detect Python version
PYTHON_VERSION=$(python -c "import sys;
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null ||
echo "unknown")
@@ -383,6 +407,13 @@ main() {
echo ""
+ # Run blob alter+compact test (Java write+alter+compact, Python read)
+ if ! run_blob_alter_compact_test; then
+ blob_alter_compact_result=1
+ fi
+
+ echo ""
+
echo -e "${YELLOW}=== Test Results Summary ===${NC}"
if [[ $java_write_result -eq 0 ]]; then
@@ -439,12 +470,18 @@ main() {
echo -e "${RED}✗ Lumina Vector Index Test (Java Write, Python Read):
FAILED${NC}"
fi
+ if [[ $blob_alter_compact_result -eq 0 ]]; then
+ echo -e "${GREEN}✓ Blob Alter+Compact Test (Java Write+Alter+Compact,
Python Read): PASSED${NC}"
+ else
+ echo -e "${RED}✗ Blob Alter+Compact Test (Java Write+Alter+Compact,
Python Read): FAILED${NC}"
+ fi
+
echo ""
# Clean up warehouse directory after all tests
cleanup_warehouse
- if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 &&
$btree_index_result -eq 0 && $compressed_text_result -eq 0 &&
$tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 ]]; then
+ if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 &&
$btree_index_result -eq 0 && $compressed_text_result -eq 0 &&
$tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 &&
$blob_alter_compact_result -eq 0 ]]; then
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability
verified.${NC}"
return 0
else
diff --git a/paimon-python/pypaimon/read/reader/field_bunch.py
b/paimon-python/pypaimon/read/reader/field_bunch.py
index fccf2cb325..f9f2dca3ce 100644
--- a/paimon-python/pypaimon/read/reader/field_bunch.py
+++ b/paimon-python/pypaimon/read/reader/field_bunch.py
@@ -99,10 +99,11 @@ class BlobBunch(FieldBunch):
)
if self._files:
- if file.schema_id != self._files[0].schema_id:
- raise ValueError(
- "All files in a blob bunch should have the same schema
id."
- )
+ if not DataFileMeta.is_blob_file(file.file_name):
+ if file.schema_id != self._files[0].schema_id:
+ raise ValueError(
+ "All files in a blob bunch should have the same
schema id."
+ )
if file.write_cols != self._files[0].write_cols:
raise ValueError(
"All files in a blob bunch should have the same write
columns."
diff --git a/paimon-python/pypaimon/schema/schema_manager.py
b/paimon-python/pypaimon/schema/schema_manager.py
index 0a18c77630..721925dc33 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -136,6 +136,18 @@ def _assert_not_updating_primary_keys(
raise ValueError(f"Cannot {operation} primary key")
+def _assert_not_renaming_blob_column(
+ new_fields: List[DataField], field_names: List[str]):
+ if len(field_names) > 1:
+ return
+ field_name = field_names[0]
+ for field in new_fields:
+ if field.name == field_name and str(field.type) == 'BLOB':
+ raise ValueError(
+ f"Cannot rename BLOB column: [{field_name}]"
+ )
+
+
def _handle_rename_column(change: RenameColumn, new_fields: List[DataField]):
field_name = change.field_names[-1]
new_name = change.new_name
@@ -355,6 +367,7 @@ class SchemaManager:
_assert_not_updating_partition_keys(
old_table_schema, change.field_names, "rename"
)
+ _assert_not_renaming_blob_column(new_fields,
change.field_names)
_handle_rename_column(change, new_fields)
elif isinstance(change, DropColumn):
_drop_column_validation(old_table_schema, change)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 0d3bf49cbd..7e4d6c6d26 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -24,6 +24,7 @@ import unittest
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
+from pypaimon.schema.schema_change import SchemaChange
from pypaimon.table.file_store_table import FileStoreTable
from pypaimon.write.commit_message import CommitMessage
@@ -3024,6 +3025,30 @@ class DataBlobWriterTest(unittest.TestCase):
result = read.to_arrow(splits)
self.assertEqual(result.num_rows, 1)
+ def test_rename_blob_column_should_fail(self):
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_col', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ }
+ )
+ self.catalog.create_table('test_db.blob_rename_test', schema, False)
+
+ with self.assertRaises(RuntimeError) as ctx:
+ self.catalog.alter_table(
+ 'test_db.blob_rename_test',
+ [SchemaChange.rename_column('blob_col', 'blob_col_renamed')],
+ False
+ )
+ self.assertIn('Cannot rename BLOB column', str(ctx.exception))
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 5cc7d54925..4a61b9a067 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -602,3 +602,12 @@ class JavaPyReadWriteTest(unittest.TestCase):
ids = pa_table.column('id').to_pylist()
print(f"Lumina vector search matched rows: ids={ids}")
self.assertIn(0, ids)
+
+ def test_read_blob_after_alter_and_compact(self):
+ table = self.catalog.get_table('default.blob_alter_compact_test')
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ splits = table_scan.plan().splits()
+ result = table_read.to_arrow(splits)
+ self.assertEqual(result.num_rows, 200)