This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit c258aea8e931506f409f4c301b7df91fbb3ca68a
Author: littlecoder04 <[email protected]>
AuthorDate: Fri Apr 10 16:18:23 2026 +0800

    [core] Fix blob read failure after alter table and compaction (#7618)
    
    Reading a blob table fails after ALTER TABLE SET and compaction with:
    `java.lang.IllegalArgumentException: All files in this bunch should have
    the same schema id.` This PR fixes the above issue by do not check
    schemaId for blob file and do not allow rename for blob col.
---
 .../paimon/operation/DataEvolutionSplitRead.java   |   8 +-
 .../org/apache/paimon/schema/SchemaManager.java    |  15 ++
 .../test/java/org/apache/paimon/JavaPyE2ETest.java |  80 +++++++
 .../org/apache/paimon/append/BlobTableTest.java    | 239 +++++++++++++++++++++
 .../paimon/operation/DataEvolutionReadTest.java    |  44 ++++
 paimon-python/dev/run_mixed_tests.sh               |  41 +++-
 paimon-python/pypaimon/read/reader/field_bunch.py  |   9 +-
 paimon-python/pypaimon/schema/schema_manager.py    |  13 ++
 paimon-python/pypaimon/tests/blob_table_test.py    |  25 +++
 .../pypaimon/tests/e2e/java_py_read_write_test.py  |   9 +
 10 files changed, 474 insertions(+), 9 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index dd8433b1b4..c0e70ff4f8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -548,9 +548,11 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                     }
                 }
                 if (!files.isEmpty()) {
-                    checkArgument(
-                            file.schemaId() == files.get(0).schemaId(),
-                            "All files in this bunch should have the same 
schema id.");
+                    if (!isBlobFile(file.fileName())) {
+                        checkArgument(
+                                file.schemaId() == files.get(0).schemaId(),
+                                "All files in this bunch should have the same 
schema id.");
+                    }
                     checkArgument(
                             file.writeCols().equals(files.get(0).writeCols()),
                             "All files in this bunch should have the same 
write columns.");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 8667f2271d..3aca4b762b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -42,6 +42,7 @@ import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeCasts;
+import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.ReassignFieldId;
 import org.apache.paimon.types.RowType;
@@ -404,6 +405,7 @@ public class SchemaManager implements Serializable {
             } else if (change instanceof RenameColumn) {
                 RenameColumn rename = (RenameColumn) change;
                 assertNotUpdatingPartitionKeys(oldTableSchema, 
rename.fieldNames(), "rename");
+                assertNotRenamingBlobColumn(newFields, rename.fieldNames());
                 new NestedColumnModifier(rename.fieldNames(), lazyIdentifier) {
                     @Override
                     protected void updateLastColumn(
@@ -908,6 +910,19 @@ public class SchemaManager implements Serializable {
         }
     }
 
+    private static void assertNotRenamingBlobColumn(List<DataField> fields, 
String[] fieldNames) {
+        if (fieldNames.length > 1) {
+            return;
+        }
+        String fieldName = fieldNames[0];
+        for (DataField field : fields) {
+            if (field.name().equals(fieldName) && 
field.type().is(DataTypeRoot.BLOB)) {
+                throw new UnsupportedOperationException(
+                        String.format("Cannot rename BLOB column: [%s]", 
fieldName));
+            }
+        }
+    }
+
     private abstract static class NestedColumnModifier {
 
         private final String[] updateFieldNames;
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java 
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index ca640aa68d..bd1e006ec7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -791,6 +791,86 @@ public class JavaPyE2ETest {
         }
     }
 
+    @Test
+    @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    public void testBlobWriteAlterCompact() throws Exception {
+        Identifier identifier = identifier("blob_alter_compact_test");
+        catalog.dropTable(identifier, true);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("f0", DataTypes.INT())
+                        .column("f1", DataTypes.STRING())
+                        .column("f2", DataTypes.BLOB())
+                        .option("target-file-size", "100 MB")
+                        .option("row-tracking.enabled", "true")
+                        .option("data-evolution.enabled", "true")
+                        .option("compaction.min.file-num", "2")
+                        .option("bucket", "-1")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+
+        byte[] blobBytes = new byte[1024];
+        new java.util.Random(42).nextBytes(blobBytes);
+
+        // Batch 1: write with schemaId=0
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        StreamTableWrite write =
+                
table.newStreamWriteBuilder().withCommitUser(commitUser).newWrite();
+        StreamTableCommit commit = table.newCommit(commitUser);
+        for (int i = 0; i < 100; i++) {
+            write.write(
+                    GenericRow.of(
+                            1,
+                            BinaryString.fromString("batch1"),
+                            new org.apache.paimon.data.BlobData(blobBytes)));
+        }
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        // ALTER TABLE SET -> schemaId becomes 1
+        catalog.alterTable(
+                identifier,
+                
org.apache.paimon.schema.SchemaChange.setOption("snapshot.num-retained.min", 
"5"),
+                false);
+
+        // Batch 2: write with schemaId=1
+        table = (FileStoreTable) catalog.getTable(identifier);
+        write = 
table.newStreamWriteBuilder().withCommitUser(commitUser).newWrite();
+        commit = table.newCommit(commitUser);
+        for (int i = 0; i < 100; i++) {
+            write.write(
+                    GenericRow.of(
+                            2,
+                            BinaryString.fromString("batch2"),
+                            new org.apache.paimon.data.BlobData(blobBytes)));
+        }
+        commit.commit(1, write.prepareCommit(false, 1));
+        write.close();
+        commit.close();
+
+        // Compact
+        table = (FileStoreTable) catalog.getTable(identifier);
+        org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator 
coordinator =
+                new 
org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator(
+                        table, false, false);
+        List<org.apache.paimon.append.dataevolution.DataEvolutionCompactTask> 
tasks =
+                coordinator.plan();
+        assertThat(tasks.size()).isGreaterThan(0);
+        List<org.apache.paimon.table.sink.CommitMessage> compactMessages = new 
ArrayList<>();
+        for (org.apache.paimon.append.dataevolution.DataEvolutionCompactTask 
task : tasks) {
+            compactMessages.add(task.doCompact(table, commitUser));
+        }
+        StreamTableCommit compactCommit = table.newCommit(commitUser);
+        compactCommit.commit(2, compactMessages);
+        compactCommit.close();
+
+        FileStoreTable readTable = (FileStoreTable) 
catalog.getTable(identifier);
+        List<Split> splits = new 
ArrayList<>(readTable.newSnapshotReader().read().dataSplits());
+        TableRead read = readTable.newRead();
+        RowType rowType = readTable.rowType();
+        List<String> res = getResult(read, splits, row -> 
internalRowToString(row, rowType));
+        assertThat(res).hasSize(200);
+    }
+
     // Helper method from TableTestBase
     protected Identifier identifier(String tableName) {
         return new Identifier(database, tableName);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 736ce904cf..e3019485bb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.append;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.BlobData;
@@ -33,10 +35,14 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.DataEvolutionSplitRead;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.system.RowTrackingTable;
 import org.apache.paimon.types.DataField;
@@ -45,6 +51,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.UriReader;
 
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -512,6 +519,238 @@ public class BlobTableTest extends TableTestBase {
         assertThat(projectedCount.get()).isEqualTo(1);
     }
 
+    @Test
+    void testReadBlobAfterAlterTableAndCompaction() throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+        // Step 1: write data with schemaId=0
+        commitDefault(writeDataDefault(100, 1));
+
+        // Step 2: ALTER TABLE SET an unrelated option -> schemaId becomes 1
+        catalog.alterTable(
+                identifier(), 
SchemaChange.setOption("snapshot.num-retained.min", "5"), false);
+
+        // Step 3: write more data with schemaId=1
+        commitDefault(writeDataDefault(100, 1));
+
+        // Step 4: compact blob table using DataEvolutionCompactCoordinator
+        FileStoreTable table = getTableDefault();
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, false, false);
+        List<DataEvolutionCompactTask> tasks = coordinator.plan();
+        assertThat(tasks.size()).isGreaterThan(0);
+        List<CommitMessage> compactMessages = new ArrayList<>();
+        for (DataEvolutionCompactTask task : tasks) {
+            compactMessages.add(task.doCompact(table, commitUser));
+        }
+        commitDefault(compactMessages);
+
+        // Step 5: read after compaction
+        readDefault(row -> 
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes));
+    }
+
+    @Test
+    void testReadBlobAfterAddColumnAndCompaction() throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+        {
+            FileStoreTable t = getTableDefault();
+            StreamWriteBuilder b = 
t.newStreamWriteBuilder().withCommitUser(commitUser);
+            try (StreamTableWrite w = b.newWrite()) {
+                for (int j = 0; j < 100; j++) {
+                    w.write(
+                            GenericRow.of(
+                                    1, BinaryString.fromString("batch1"), new 
BlobData(blobBytes)));
+                }
+                commitDefault(w.prepareCommit(false, Long.MAX_VALUE));
+            }
+        }
+
+        catalog.alterTable(identifier(), SchemaChange.addColumn("f3", 
DataTypes.STRING()), false);
+
+        {
+            FileStoreTable t = getTableDefault();
+            StreamWriteBuilder b = 
t.newStreamWriteBuilder().withCommitUser(commitUser);
+            try (StreamTableWrite w = b.newWrite()) {
+                for (int j = 0; j < 100; j++) {
+                    w.write(
+                            GenericRow.of(
+                                    2,
+                                    BinaryString.fromString("batch2"),
+                                    new BlobData(blobBytes),
+                                    BinaryString.fromString("after-add")));
+                }
+                commitDefault(w.prepareCommit(false, Long.MAX_VALUE));
+            }
+        }
+
+        FileStoreTable table = getTableDefault();
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, false, false);
+        List<DataEvolutionCompactTask> tasks = coordinator.plan();
+        assertThat(tasks.size()).isGreaterThan(0);
+        List<CommitMessage> compactMessages = new ArrayList<>();
+        for (DataEvolutionCompactTask task : tasks) {
+            compactMessages.add(task.doCompact(table, commitUser));
+        }
+        commitDefault(compactMessages);
+
+        AtomicInteger batch1Count = new AtomicInteger(0);
+        AtomicInteger batch2Count = new AtomicInteger(0);
+        readDefault(
+                row -> {
+                    assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+                    if (row.getInt(0) == 1) {
+                        batch1Count.incrementAndGet();
+                    } else if (row.getInt(0) == 2) {
+                        batch2Count.incrementAndGet();
+                    }
+                });
+        assertThat(batch1Count.get()).isEqualTo(100);
+        assertThat(batch2Count.get()).isEqualTo(100);
+    }
+
+    @Test
+    void testReadBlobAfterDropColumnAndCompaction() throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.column("f3", DataTypes.STRING());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+        {
+            FileStoreTable t = getTableDefault();
+            StreamWriteBuilder b = 
t.newStreamWriteBuilder().withCommitUser(commitUser);
+            try (StreamTableWrite w = b.newWrite()) {
+                for (int j = 0; j < 100; j++) {
+                    w.write(
+                            GenericRow.of(
+                                    1,
+                                    BinaryString.fromString("batch1"),
+                                    new BlobData(blobBytes),
+                                    BinaryString.fromString("before-drop")));
+                }
+                commitDefault(w.prepareCommit(false, Long.MAX_VALUE));
+            }
+        }
+
+        catalog.alterTable(identifier(), SchemaChange.dropColumn("f3"), false);
+
+        {
+            FileStoreTable t = getTableDefault();
+            StreamWriteBuilder b = 
t.newStreamWriteBuilder().withCommitUser(commitUser);
+            try (StreamTableWrite w = b.newWrite()) {
+                for (int j = 0; j < 100; j++) {
+                    w.write(
+                            GenericRow.of(
+                                    2, BinaryString.fromString("batch2"), new 
BlobData(blobBytes)));
+                }
+                commitDefault(w.prepareCommit(false, Long.MAX_VALUE));
+            }
+        }
+
+        FileStoreTable table = getTableDefault();
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, false, false);
+        List<DataEvolutionCompactTask> tasks = coordinator.plan();
+        assertThat(tasks.size()).isGreaterThan(0);
+        List<CommitMessage> compactMessages = new ArrayList<>();
+        for (DataEvolutionCompactTask task : tasks) {
+            compactMessages.add(task.doCompact(table, commitUser));
+        }
+        commitDefault(compactMessages);
+
+        AtomicInteger batch1Count = new AtomicInteger(0);
+        AtomicInteger batch2Count = new AtomicInteger(0);
+        readDefault(
+                row -> {
+                    assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+                    if (row.getInt(0) == 1) {
+                        batch1Count.incrementAndGet();
+                    } else if (row.getInt(0) == 2) {
+                        batch2Count.incrementAndGet();
+                    }
+                });
+        assertThat(batch1Count.get()).isEqualTo(100);
+        assertThat(batch2Count.get()).isEqualTo(100);
+    }
+
+    @Disabled("Reproduce: rename blob column causes read failure after 
compaction")
+    @Test
+    void testRenameBlobColumnReadFailure() throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BLOB());
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB");
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+        // Step 1: write blob data — blob files record writeCols=["f2"]
+        commitDefault(writeDataDefault(100, 1));
+
+        // Step 2: rename blob column f2 -> f2_renamed
+        catalog.alterTable(identifier(), SchemaChange.renameColumn("f2", 
"f2_renamed"), false);
+
+        // Step 3: write more data — new blob files have 
writeCols=["f2_renamed"]
+        commitDefault(writeDataDefault(100, 1));
+
+        // Step 4: compact merges files into the same split
+        FileStoreTable table = getTableDefault();
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, false, false);
+        List<DataEvolutionCompactTask> tasks = coordinator.plan();
+        assertThat(tasks.size()).isGreaterThan(0);
+        List<CommitMessage> compactMessages = new ArrayList<>();
+        for (DataEvolutionCompactTask task : tasks) {
+            compactMessages.add(task.doCompact(table, commitUser));
+        }
+        commitDefault(compactMessages);
+
+        // Step 5: read fails —
+        assertThatThrownBy(() -> readDefault(row -> {}))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("All files in this bunch should have the 
same write columns");
+    }
+
+    @Test
+    void testRenameBlobColumnShouldFail() throws Exception {
+        createTableDefault();
+        commitDefault(writeDataDefault(10, 1));
+
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        identifier(),
+                                        SchemaChange.renameColumn("f2", 
"f2_renamed"),
+                                        false))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Cannot rename BLOB column");
+    }
+
     private void createExternalStorageTable() throws Exception {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f0", DataTypes.INT());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
index 4e85b1e3dd..9bb9a7274e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -297,6 +297,36 @@ public class DataEvolutionReadTest {
                 fileName, firstRowId, rowCount, maxSequenceNumber, 
Arrays.asList("blob_col"));
     }
 
+    /** Creates a blob file with a specified schemaId. */
+    private DataFileMeta createBlobFileWithSchema(
+            String fileName,
+            long firstRowId,
+            long rowCount,
+            long maxSequenceNumber,
+            long schemaId) {
+        return DataFileMeta.create(
+                fileName + ".blob",
+                rowCount,
+                rowCount,
+                DataFileMeta.EMPTY_MIN_KEY,
+                DataFileMeta.EMPTY_MAX_KEY,
+                SimpleStats.EMPTY_STATS,
+                SimpleStats.EMPTY_STATS,
+                0,
+                maxSequenceNumber,
+                schemaId,
+                DataFileMeta.DUMMY_LEVEL,
+                Collections.emptyList(),
+                Timestamp.fromEpochMillis(System.currentTimeMillis()),
+                rowCount,
+                null,
+                FileSource.APPEND,
+                null,
+                null,
+                firstRowId,
+                Arrays.asList("blob_col"));
+    }
+
     /** Creates a blob file with custom write columns. */
     private DataFileMeta createBlobFileWithCols(
             String fileName,
@@ -327,6 +357,20 @@ public class DataEvolutionReadTest {
                 writeCols);
     }
 
+    @Test
+    void testAddBlobFilesWithDifferentSchemaId() {
+        DataFileMeta blobEntry1 = createBlobFileWithSchema("blob1", 0, 100, 1, 
0L);
+        DataFileMeta blobEntry2 = createBlobFileWithSchema("blob2", 100, 200, 
1, 1L);
+
+        blobBunch.add(blobEntry1);
+        assertThatCode(() -> 
blobBunch.add(blobEntry2)).doesNotThrowAnyException();
+
+        assertThat(blobBunch.files).hasSize(2);
+        assertThat(blobBunch.files.get(0).schemaId()).isEqualTo(0L);
+        assertThat(blobBunch.files.get(1).schemaId()).isEqualTo(1L);
+        assertThat(blobBunch.rowCount()).isEqualTo(300);
+    }
+
     @Test
     public void testRowIdPushDown() {
         SpecialFieldBunch blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, 
true);
diff --git a/paimon-python/dev/run_mixed_tests.sh 
b/paimon-python/dev/run_mixed_tests.sh
index a7e5d5dd76..09bcb58d2b 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -244,6 +244,29 @@ run_compressed_text_test() {
     fi
 }
 
+run_blob_alter_compact_test() {
+    echo -e "${YELLOW}=== Running Blob Alter+Compact Test (Java 
Write+Alter+Compact, Python Read) ===${NC}"
+
+    cd "$PROJECT_ROOT"
+
+    echo "Running Maven test for JavaPyE2ETest.testBlobWriteAlterCompact..."
+    if mvn test 
-Dtest=org.apache.paimon.JavaPyE2ETest#testBlobWriteAlterCompact -pl 
paimon-core -q -Drun.e2e.tests=true; then
+        echo -e "${GREEN}✓ Java blob write+alter+compact test completed 
successfully${NC}"
+    else
+        echo -e "${RED}✗ Java blob write+alter+compact test failed${NC}"
+        return 1
+    fi
+    cd "$PAIMON_PYTHON_DIR"
+    echo "Running Python test for 
JavaPyReadWriteTest.test_read_blob_after_alter_and_compact..."
+    if python -m pytest 
java_py_read_write_test.py::JavaPyReadWriteTest::test_read_blob_after_alter_and_compact
 -v; then
+        echo -e "${GREEN}✓ Python blob read test completed successfully${NC}"
+        return 0
+    else
+        echo -e "${RED}✗ Python blob read test failed${NC}"
+        return 1
+    fi
+}
+
 # Main execution
 main() {
     local java_write_result=0
@@ -253,6 +276,7 @@ main() {
     local pk_dv_result=0
     local btree_index_result=0
     local compressed_text_result=0
+    local blob_alter_compact_result=0
 
     echo -e "${YELLOW}Starting mixed language test execution...${NC}"
     echo ""
@@ -311,6 +335,13 @@ main() {
 
     echo ""
 
+    # Run blob alter+compact test (Java write+alter+compact, Python read)
+    if ! run_blob_alter_compact_test; then
+        blob_alter_compact_result=1
+    fi
+
+    echo ""
+
     echo -e "${YELLOW}=== Test Results Summary ===${NC}"
 
     if [[ $java_write_result -eq 0 ]]; then
@@ -355,12 +386,18 @@ main() {
         echo -e "${RED}✗ Compressed Text Test (Java Write, Python Read): 
FAILED${NC}"
     fi
 
+    if [[ $blob_alter_compact_result -eq 0 ]]; then
+        echo -e "${GREEN}✓ Blob Alter+Compact Test (Java Write+Alter+Compact, 
Python Read): PASSED${NC}"
+    else
+        echo -e "${RED}✗ Blob Alter+Compact Test (Java Write+Alter+Compact, 
Python Read): FAILED${NC}"
+    fi
+
     echo ""
 
     # Clean up warehouse directory after all tests
     cleanup_warehouse
 
-    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && 
$btree_index_result -eq 0 && $compressed_text_result -eq 0 ]]; then
+    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && 
$btree_index_result -eq 0 && $compressed_text_result -eq 0 && 
$blob_alter_compact_result -eq 0 ]]; then
         echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability 
verified.${NC}"
         return 0
     else
@@ -370,4 +407,4 @@ main() {
 }
 
 # Run main function
-main "$@"
\ No newline at end of file
+main "$@"
diff --git a/paimon-python/pypaimon/read/reader/field_bunch.py 
b/paimon-python/pypaimon/read/reader/field_bunch.py
index fccf2cb325..f9f2dca3ce 100644
--- a/paimon-python/pypaimon/read/reader/field_bunch.py
+++ b/paimon-python/pypaimon/read/reader/field_bunch.py
@@ -99,10 +99,11 @@ class BlobBunch(FieldBunch):
                     )
 
             if self._files:
-                if file.schema_id != self._files[0].schema_id:
-                    raise ValueError(
-                        "All files in a blob bunch should have the same schema 
id."
-                    )
+                if not DataFileMeta.is_blob_file(file.file_name):
+                    if file.schema_id != self._files[0].schema_id:
+                        raise ValueError(
+                            "All files in a blob bunch should have the same 
schema id."
+                        )
                 if file.write_cols != self._files[0].write_cols:
                     raise ValueError(
                         "All files in a blob bunch should have the same write 
columns."
diff --git a/paimon-python/pypaimon/schema/schema_manager.py 
b/paimon-python/pypaimon/schema/schema_manager.py
index 0a18c77630..721925dc33 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -136,6 +136,18 @@ def _assert_not_updating_primary_keys(
         raise ValueError(f"Cannot {operation} primary key")
 
 
+def _assert_not_renaming_blob_column(
+        new_fields: List[DataField], field_names: List[str]):
+    if len(field_names) > 1:
+        return
+    field_name = field_names[0]
+    for field in new_fields:
+        if field.name == field_name and str(field.type) == 'BLOB':
+            raise ValueError(
+                f"Cannot rename BLOB column: [{field_name}]"
+            )
+
+
 def _handle_rename_column(change: RenameColumn, new_fields: List[DataField]):
     field_name = change.field_names[-1]
     new_name = change.new_name
@@ -355,6 +367,7 @@ class SchemaManager:
                 _assert_not_updating_partition_keys(
                     old_table_schema, change.field_names, "rename"
                 )
+                _assert_not_renaming_blob_column(new_fields, 
change.field_names)
                 _handle_rename_column(change, new_fields)
             elif isinstance(change, DropColumn):
                 _drop_column_validation(old_table_schema, change)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 0d3bf49cbd..7e4d6c6d26 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -24,6 +24,7 @@ import unittest
 import pyarrow as pa
 
 from pypaimon import CatalogFactory, Schema
+from pypaimon.schema.schema_change import SchemaChange
 from pypaimon.table.file_store_table import FileStoreTable
 from pypaimon.write.commit_message import CommitMessage
 
@@ -3024,6 +3025,30 @@ class DataBlobWriterTest(unittest.TestCase):
         result = read.to_arrow(splits)
         self.assertEqual(result.num_rows, 1)
 
+    def test_rename_blob_column_should_fail(self):
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('blob_col', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+            }
+        )
+        self.catalog.create_table('test_db.blob_rename_test', schema, False)
+
+        with self.assertRaises(RuntimeError) as ctx:
+            self.catalog.alter_table(
+                'test_db.blob_rename_test',
+                [SchemaChange.rename_column('blob_col', 'blob_col_renamed')],
+                False
+            )
+        self.assertIn('Cannot rename BLOB column', str(ctx.exception))
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 1f906c656d..6bbbcfc37f 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -503,3 +503,12 @@ class JavaPyReadWriteTest(unittest.TestCase):
             table_read.to_arrow(splits)
         self.assertIn(file_format, str(ctx.exception))
         self.assertIn("not yet supported", str(ctx.exception))
+
+    def test_read_blob_after_alter_and_compact(self):
+        table = self.catalog.get_table('default.blob_alter_compact_test')
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        splits = table_scan.plan().splits()
+        result = table_read.to_arrow(splits)
+        self.assertEqual(result.num_rows, 200)

Reply via email to