This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d29a531a48 [Fix][Connectors-v2] fix dynamic bucket  for paimon sink 
(#9595)
d29a531a48 is described below

commit d29a531a4831a441c872a22d9d26982b85f502ab
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Jul 31 16:30:21 2025 +0800

    [Fix][Connectors-v2] fix dynamic bucket  for paimon sink (#9595)
---
 docs/en/connector-v2/sink/Paimon.md                |   4 +
 docs/zh/connector-v2/sink/Paimon.md                |   4 +
 .../seatunnel/paimon/config/PaimonSinkConfig.java  |   7 -
 .../seatunnel/paimon/sink/PaimonSink.java          |  11 +-
 .../seatunnel/paimon/sink/PaimonSinkWriter.java    |  80 ++++++++--
 .../sink/bucket/FixedBucketRowKeyExtractor.java    |  72 +++++++++
 .../paimon/sink/bucket/PaimonBucketAssigner.java   |  87 ++++------
 .../sink/bucket/PaimonBucketAssignerFactory.java   |  66 ++++++++
 .../sink/bucket/RowAssignerChannelComputer.java    |  60 +++++++
 .../paimon/sink/writer/PaimonWriteTest.java        |  13 +-
 .../paimon/PaimonSinkDynamicBucketIT.java          | 176 ++++++++++++++++++++-
 .../src/test/resources/ddl/bucket.sql              |  47 ++++++
 .../fake_to_dynamic_bucket_paimon_case2.conf       |   2 +
 .../fake_to_dynamic_bucket_paimon_case3.conf       |   2 +
 .../fake_to_dynamic_bucket_paimon_case4.conf       |  12 ++
 .../fake_to_dynamic_bucket_paimon_case5.conf       |   2 +
 ...nf => fake_to_dynamic_bucket_paimon_case8.conf} |  28 +++-
 ...mysql_jdbc_to_dynamic_bucket_paimon_case1.conf} |  31 ++--
 ...mysql_jdbc_to_dynamic_bucket_paimon_case2.conf} |  31 ++--
 ...mysql_jdbc_to_dynamic_bucket_paimon_case3.conf} |  31 ++--
 20 files changed, 602 insertions(+), 164 deletions(-)

diff --git a/docs/en/connector-v2/sink/Paimon.md 
b/docs/en/connector-v2/sink/Paimon.md
index 8491b880b9..9f126895c9 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -448,6 +448,10 @@ sink {
 
 Single dynamic bucket table with write props of paimon,operates on the primary 
key table and bucket is -1.
 
+> Notes:
+> - Currently only the ordinary dynamic bucket mode is supported (the primary 
key must include all partition fields).
+> - When running in a cluster environment, `parallelism` must be set to `1`; 
otherwise, data duplication may occur.
+
 #### core options
 
 Please 
[reference](https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket)
diff --git a/docs/zh/connector-v2/sink/Paimon.md 
b/docs/zh/connector-v2/sink/Paimon.md
index f8aa5e330b..a386c903dc 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -444,6 +444,10 @@ sink {
 
 只有在主键表并指定bucket = -1时才会生效
 
+> 注意: 
+> - 目前只支持普通动态桶模式(主键包含所以分区字段)。
+> - 在集群环境下运行时`parallelism`必须为`1`, 否则可能存在数据重复问题。
+
 #### 
核心参数:[参考官网](https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket)
 
 |               名称               |  类型  | 是否必须 |   默认值    |        描述        |
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
index 857820b6a8..d54a65b80b 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -79,12 +79,5 @@ public class PaimonSinkConfig extends PaimonConfig {
         this.changelogTmpPath =
                 writeProps.getOrDefault(
                         PaimonSinkOptions.CHANGELOG_TMP_PATH, 
System.getProperty("java.io.tmpdir"));
-        checkConfig();
-    }
-
-    private void checkConfig() {
-        if (this.primaryKeys.isEmpty() && 
"-1".equals(this.writeProps.get("bucket"))) {
-            log.warn("Append only table currently do not support dynamic 
bucket");
-        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index 0129438c83..44869f5793 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -36,6 +36,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfi
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.handler.PaimonSaveModeHandler;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitter;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
@@ -75,11 +76,14 @@ public class PaimonSink
 
     private final PaimonHadoopConfiguration paimonHadoopConfiguration;
 
+    private final PaimonBucketAssignerFactory paimonBucketAssignerFactory;
+
     public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
         this.readonlyConfig = readonlyConfig;
         this.paimonSinkConfig = new PaimonSinkConfig(readonlyConfig);
         this.catalogTable = catalogTable;
         this.paimonHadoopConfiguration = 
PaimonSecurityContext.loadHadoopConfig(paimonSinkConfig);
+        this.paimonBucketAssignerFactory = new PaimonBucketAssignerFactory();
     }
 
     @Override
@@ -96,7 +100,8 @@ public class PaimonSink
                 paimonTable,
                 jobContext,
                 paimonSinkConfig,
-                paimonHadoopConfiguration);
+                paimonHadoopConfiguration,
+                paimonBucketAssignerFactory);
     }
 
     @Override
@@ -113,10 +118,10 @@ public class PaimonSink
                 readonlyConfig,
                 catalogTable,
                 paimonTable,
-                states,
                 jobContext,
                 paimonSinkConfig,
-                paimonHadoopConfiguration);
+                paimonHadoopConfiguration,
+                paimonBucketAssignerFactory);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 6ba1b877ef..08d60c0507 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -39,6 +39,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnecto
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssigner;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.RowAssignerChannelComputer;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.schema.handler.AlterPaimonTableSchemaEventHandler;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
@@ -63,9 +65,11 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -93,10 +97,10 @@ public class PaimonSinkWriter
 
     private TableSchema sinkPaimonTableSchema;
 
-    private PaimonBucketAssigner bucketAssigner;
-
     private final boolean dynamicBucket;
 
+    private final PaimonBucketAssignerFactory paimonBucketAssignerFactory;
+
     private final PaimonCatalog paimonCatalog;
 
     private final TablePath paimonTablePath;
@@ -108,21 +112,30 @@ public class PaimonSinkWriter
 
     private final JobContext jobContext;
 
+    private final RowAssignerChannelComputer rowAssignerChannelComputer;
+
+    private final int parallelism;
+
+    private final int taskIndex;
+
+    private final Set<PaimonBucketAssigner> bucketAssigners = new HashSet<>();
+
     public PaimonSinkWriter(
             Context context,
             ReadonlyConfig readonlyConfig,
             CatalogTable catalogTable,
-            Table paimonFileStoretable,
+            Table paimonTable,
             JobContext jobContext,
             PaimonSinkConfig paimonSinkConfig,
-            PaimonHadoopConfiguration paimonHadoopConfiguration) {
+            PaimonHadoopConfiguration paimonHadoopConfiguration,
+            PaimonBucketAssignerFactory paimonBucketAssignerFactory) {
         this.sourceTableSchema = catalogTable.getTableSchema();
         this.seaTunnelRowType = this.sourceTableSchema.toPhysicalRowDataType();
         this.jobContext = jobContext;
         this.paimonTablePath = catalogTable.getTablePath();
         this.paimonCatalog = PaimonCatalog.loadPaimonCatalog(readonlyConfig);
         this.paimonCatalog.open();
-        this.paimonFileStoretable = (FileStoreTable) paimonFileStoretable;
+        this.paimonFileStoretable = (FileStoreTable) paimonTable;
         CoreOptions.ChangelogProducer changelogProducer =
                 this.paimonFileStoretable.coreOptions().changelogProducer();
         if (Objects.nonNull(paimonSinkConfig.getChangelogProducer())
@@ -130,22 +143,35 @@ public class PaimonSinkWriter
             log.warn(
                     "configured the props named 'changelog-producer' which is 
not compatible with the options in table , so it will use the table's 
'changelog-producer'");
         }
+        this.rowAssignerChannelComputer =
+                new RowAssignerChannelComputer(
+                        paimonFileStoretable.schema(), 
context.getNumberOfParallelSubtasks());
+        
rowAssignerChannelComputer.setup(context.getNumberOfParallelSubtasks());
+        this.paimonBucketAssignerFactory = paimonBucketAssignerFactory;
+        this.parallelism = context.getNumberOfParallelSubtasks();
+        this.taskIndex = context.getIndexOfSubtask();
         this.paimonSinkConfig = paimonSinkConfig;
         this.sinkPaimonTableSchema = this.paimonFileStoretable.schema();
         this.newTableWrite();
         BucketMode bucketMode = this.paimonFileStoretable.bucketMode();
-        this.dynamicBucket =
-                BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC 
== bucketMode;
-        int bucket = ((FileStoreTable) 
paimonFileStoretable).coreOptions().bucket();
+        // 
https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket
+        // When you need cross partition upsert (primary keys not contain all 
partition fields),
+        // Dynamic Bucket mode directly maintains the mapping of keys to 
partition and bucket, uses
+        // local disks, and initializes indexes by reading all existing keys 
in the table when
+        // starting job. For tables with a large amount of data, there will be 
a significant loss in
+        // performance. Moreover, initialization takes a long time. This mode 
is not supported at
+        // this time.
+        if (BucketMode.GLOBAL_DYNAMIC == bucketMode) {
+            throw new UnsupportedOperationException(
+                    "Cross Partitions Upsert Dynamic Bucket Mode is not 
supported.");
+        }
+        this.dynamicBucket = BucketMode.DYNAMIC == bucketMode;
+        int bucket = ((FileStoreTable) paimonTable).coreOptions().bucket();
         if (bucket == -1 && BucketMode.UNAWARE == bucketMode) {
             log.warn("Append only table currently do not support dynamic 
bucket");
         }
         if (dynamicBucket) {
-            this.bucketAssigner =
-                    new PaimonBucketAssigner(
-                            paimonFileStoretable,
-                            context.getNumberOfParallelSubtasks(),
-                            context.getIndexOfSubtask());
+            paimonBucketAssignerFactory.init(paimonTablePath, 
paimonFileStoretable, parallelism);
         }
         PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
     }
@@ -158,7 +184,8 @@ public class PaimonSinkWriter
             List<PaimonSinkState> states,
             JobContext jobContext,
             PaimonSinkConfig paimonSinkConfig,
-            PaimonHadoopConfiguration paimonHadoopConfiguration) {
+            PaimonHadoopConfiguration paimonHadoopConfiguration,
+            PaimonBucketAssignerFactory paimonBucketAssignerFactory) {
         this(
                 context,
                 readonlyConfig,
@@ -166,7 +193,8 @@ public class PaimonSinkWriter
                 paimonFileStoretable,
                 jobContext,
                 paimonSinkConfig,
-                paimonHadoopConfiguration);
+                paimonHadoopConfiguration,
+                paimonBucketAssignerFactory);
         if (Objects.isNull(states) || states.isEmpty()) {
             return;
         }
@@ -199,8 +227,20 @@ public class PaimonSinkWriter
             PaimonSecurityContext.runSecured(
                     () -> {
                         if (dynamicBucket) {
-                            int bucket = bucketAssigner.assign(rowData);
-                            tableWrite.write(rowData, bucket);
+                            // The result of calculating the remainder of the 
parallelism using the
+                            // hash code of the primary key must be consistent 
with the task
+                            // sequence number.
+                            PaimonBucketAssigner bucketAssigner =
+                                    
paimonBucketAssignerFactory.getBucketAssigner(
+                                            paimonTablePath,
+                                            
rowAssignerChannelComputer.channel(rowData));
+                            // When multiple threads call assigner.assign() 
simultaneously, they can
+                            // corrupt the internal hash map structure, 
leading to the
+                            // ArrayIndexOutOfBoundsException during rehashing 
operations
+                            synchronized (bucketAssigner) {
+                                tableWrite.write(rowData, 
bucketAssigner.assign(rowData));
+                                bucketAssigners.add(bucketAssigner);
+                            }
                         } else {
                             tableWrite.write(rowData);
                         }
@@ -256,6 +296,11 @@ public class PaimonSinkWriter
             List<CommitMessage> fileCommittables =
                     ((StreamTableWrite) 
tableWrite).prepareCommit(waitCompaction(), checkpointId);
             committables.addAll(fileCommittables);
+            if (!bucketAssigners.isEmpty()) {
+                List<PaimonBucketAssigner> assigners = new 
ArrayList<>(bucketAssigners);
+                bucketAssigners.clear();
+                assigners.forEach(assigner -> 
assigner.prepareCommit(checkpointId));
+            }
             return Optional.of(new PaimonCommitInfo(fileCommittables, 
checkpointId));
         } catch (Exception e) {
             throw new PaimonConnectorException(
@@ -282,6 +327,7 @@ public class PaimonSinkWriter
             tableWriteClose(this.tableWrite);
         } finally {
             committables.clear();
+            paimonBucketAssignerFactory.clear(paimonTablePath, taskIndex);
             if (Objects.nonNull(paimonCatalog)) {
                 paimonCatalog.close();
             }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/FixedBucketRowKeyExtractor.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/FixedBucketRowKeyExtractor.java
new file mode 100644
index 0000000000..8a2a453cb8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/FixedBucketRowKeyExtractor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+import org.apache.paimon.table.sink.RowKeyExtractor;
+
+public class FixedBucketRowKeyExtractor extends RowKeyExtractor {
+    private final int numBuckets;
+    private final boolean sameBucketKeyAndTrimmedPrimaryKey;
+    private final Projection bucketKeyProjection;
+    private BinaryRow reuseBucketKey;
+    private Integer reuseBucket;
+
+    public FixedBucketRowKeyExtractor(TableSchema schema) {
+        super(schema);
+        this.numBuckets = (new CoreOptions(schema.options())).bucket();
+        this.sameBucketKeyAndTrimmedPrimaryKey =
+                schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
+        this.bucketKeyProjection =
+                CodeGenUtils.newProjection(
+                        schema.logicalRowType(), 
schema.projection(schema.bucketKeys()));
+    }
+
+    public void setRecord(InternalRow record) {
+        super.setRecord(record);
+        this.reuseBucketKey = null;
+        this.reuseBucket = null;
+    }
+
+    private BinaryRow bucketKey() {
+        if (this.sameBucketKeyAndTrimmedPrimaryKey) {
+            return this.trimmedPrimaryKey();
+        } else {
+            if (this.reuseBucketKey == null) {
+                this.reuseBucketKey = 
this.bucketKeyProjection.apply(this.record);
+            }
+            return this.reuseBucketKey;
+        }
+    }
+
+    public int bucket() {
+        BinaryRow bucketKey = this.bucketKey();
+        if (this.reuseBucket == null) {
+            this.reuseBucket =
+                    KeyAndBucketExtractor.bucket(
+                            
KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), this.numBuckets);
+        }
+
+        return this.reuseBucket;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
index 16804754fa..f7919fc4b4 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
@@ -17,77 +17,50 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
 
-import org.apache.paimon.crosspartition.IndexBootstrap;
-import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.index.SimpleHashBucketAssigner;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.reader.RecordReaderIterator;
-import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.index.HashBucketAssigner;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 public class PaimonBucketAssigner {
 
-    private final RowPartitionKeyExtractor extractor;
+    private boolean isRunning;
 
-    private final SimpleHashBucketAssigner simpleHashBucketAssigner;
+    private final FixedBucketRowKeyExtractor extractor;
 
-    private final TableSchema schema;
+    private final HashBucketAssigner hashBucketAssigner;
 
     public PaimonBucketAssigner(Table table, int numAssigners, int assignId) {
         FileStoreTable fileStoreTable = (FileStoreTable) table;
-        this.schema = fileStoreTable.schema();
-        this.extractor = new RowPartitionKeyExtractor(fileStoreTable.schema());
-        long dynamicBucketTargetRowNum =
-                ((FileStoreTable) 
table).coreOptions().dynamicBucketTargetRowNum();
-        this.simpleHashBucketAssigner =
-                new SimpleHashBucketAssigner(numAssigners, assignId, 
dynamicBucketTargetRowNum);
-        loadBucketIndex(fileStoreTable, numAssigners, assignId);
+        this.extractor = new 
FixedBucketRowKeyExtractor(fileStoreTable.schema());
+        long dynamicBucketTargetRowNum = 
fileStoreTable.coreOptions().dynamicBucketTargetRowNum();
+        this.hashBucketAssigner =
+                new HashBucketAssigner(
+                        fileStoreTable.snapshotManager(),
+                        "hash-bucket",
+                        fileStoreTable.store().newIndexFileHandler(),
+                        numAssigners,
+                        numAssigners,
+                        assignId,
+                        dynamicBucketTargetRowNum);
+        this.isRunning = true;
     }
 
-    private void loadBucketIndex(FileStoreTable fileStoreTable, int 
numAssigners, int assignId) {
-        IndexBootstrap indexBootstrap = new IndexBootstrap(fileStoreTable);
-        List<String> fieldNames = schema.fieldNames();
-        Map<String, Integer> fieldIndexMap =
-                IntStream.range(0, fieldNames.size())
-                        .boxed()
-                        .collect(Collectors.toMap(fieldNames::get, 
Function.identity()));
-        List<DataField> primaryKeys = schema.primaryKeysFields();
-        try (RecordReader<InternalRow> recordReader =
-                indexBootstrap.bootstrap(numAssigners, assignId)) {
-            RecordReaderIterator<InternalRow> readerIterator =
-                    new RecordReaderIterator<>(recordReader);
-            while (readerIterator.hasNext()) {
-                InternalRow row = readerIterator.next();
-                GenericRow binaryRow = new GenericRow(fieldNames.size());
-                for (int i = 0; i < primaryKeys.size(); i++) {
-                    String name = primaryKeys.get(i).name();
-                    DataType type = primaryKeys.get(i).type();
-                    binaryRow.setField(
-                            fieldIndexMap.get(name),
-                            InternalRow.createFieldGetter(type, 
i).getFieldOrNull(row));
-                }
-                assign(binaryRow);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+    public int assign(InternalRow rowData) {
+        extractor.setRecord(rowData);
+        return hashBucketAssigner.assign(
+                extractor.partition(), 
extractor.trimmedPrimaryKey().hashCode());
     }
 
-    public int assign(InternalRow rowData) {
-        int hash = extractor.trimmedPrimaryKey(rowData).hashCode();
-        return Math.abs(
-                
this.simpleHashBucketAssigner.assign(this.extractor.partition(rowData), hash));
+    public void prepareCommit(long commitIdentifier) {
+        hashBucketAssigner.prepareCommit(commitIdentifier);
+    }
+
+    public void finish() {
+        this.isRunning = false;
+    }
+
+    public boolean isRunning() {
+        return isRunning;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerFactory.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerFactory.java
new file mode 100644
index 0000000000..f1c42f8e75
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import org.apache.paimon.table.Table;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PaimonBucketAssignerFactory implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private final ConcurrentHashMap<TablePath, Map<Integer, 
PaimonBucketAssigner>>
+            bucketAssignerMap = new ConcurrentHashMap<>();
+
+    public PaimonBucketAssignerFactory() {}
+
+    public void init(final TablePath tableId, final Table table, final int 
numAssigners) {
+        bucketAssignerMap.computeIfAbsent(
+                tableId,
+                t -> {
+                    Map<Integer, PaimonBucketAssigner> map = new 
ConcurrentHashMap<>();
+                    for (int i = 0; i < numAssigners; i++) {
+                        map.put(i, new PaimonBucketAssigner(table, 
numAssigners, i));
+                    }
+                    return map;
+                });
+    }
+
+    public PaimonBucketAssigner getBucketAssigner(final TablePath tableId, 
final int assignId) {
+        return bucketAssignerMap.get(tableId).get(assignId);
+    }
+
+    public void clear(final TablePath tableId, final int assignId) {
+        if (bucketAssignerMap.containsKey(tableId)) {
+            Map<Integer, PaimonBucketAssigner> paimonBucketAssignerMap =
+                    bucketAssignerMap.get(tableId);
+            boolean isRunning =
+                    paimonBucketAssignerMap.values().stream()
+                            .anyMatch(PaimonBucketAssigner::isRunning);
+            if (!isRunning) {
+                bucketAssignerMap.remove(tableId);
+            } else {
+                paimonBucketAssignerMap.get(assignId).finish();
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/RowAssignerChannelComputer.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/RowAssignerChannelComputer.java
new file mode 100644
index 0000000000..0e0186935c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/RowAssignerChannelComputer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.utils.MathUtils;
+
+import static org.apache.paimon.index.BucketAssigner.computeAssigner;
+
+public class RowAssignerChannelComputer implements 
ChannelComputer<InternalRow> {
+    private static final long serialVersionUID = 1L;
+
+    private final TableSchema schema;
+    private Integer numAssigners;
+
+    private transient int numChannels;
+    private transient RowPartitionKeyExtractor extractor;
+
+    public RowAssignerChannelComputer(TableSchema schema, Integer 
numAssigners) {
+        this.schema = schema;
+        this.numAssigners = numAssigners;
+    }
+
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
+        this.numAssigners = MathUtils.min(numAssigners, numChannels);
+        this.extractor = new RowPartitionKeyExtractor(schema);
+    }
+
+    @Override
+    public int channel(InternalRow record) {
+        int partitionHash = extractor.partition(record).hashCode();
+        int keyHash = extractor.trimmedPrimaryKey(record).hashCode();
+        return computeAssigner(partitionHash, keyHash, numChannels, 
numAssigners);
+    }
+
+    @Override
+    public String toString() {
+        return "shuffle by key hash";
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
index f0cb816f31..6c10c7f23d 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -238,7 +239,8 @@ public class PaimonWriteTest {
                         paimonCatalog.getPaimonTable(tablePath),
                         jobContext,
                         new PaimonSinkConfig(readonlyConfig),
-                        new PaimonHadoopConfiguration());
+                        new PaimonHadoopConfiguration(),
+                        new PaimonBucketAssignerFactory());
         Assertions.assertFalse(paimonSinkWriter.waitCompaction());
 
         jobContext.setJobMode(JobMode.BATCH);
@@ -250,7 +252,8 @@ public class PaimonWriteTest {
                         paimonCatalog.getPaimonTable(tablePath),
                         jobContext,
                         new PaimonSinkConfig(readonlyConfig),
-                        new PaimonHadoopConfiguration());
+                        new PaimonHadoopConfiguration(),
+                        new PaimonBucketAssignerFactory());
         Assertions.assertTrue(paimonSinkWriter.waitCompaction());
 
         Map<String, Object> properties = new HashMap<>();
@@ -270,7 +273,8 @@ public class PaimonWriteTest {
                         paimonCatalog.getPaimonTable(tablePath),
                         jobContext,
                         new PaimonSinkConfig(readonlyConfig),
-                        new PaimonHadoopConfiguration());
+                        new PaimonHadoopConfiguration(),
+                        new PaimonBucketAssignerFactory());
         Assertions.assertTrue(paimonSinkWriter.waitCompaction());
 
         writeProps.put("changelog-producer", "full-compaction");
@@ -283,7 +287,8 @@ public class PaimonWriteTest {
                         paimonCatalog.getPaimonTable(tablePath),
                         jobContext,
                         new PaimonSinkConfig(readonlyConfig),
-                        new PaimonHadoopConfiguration());
+                        new PaimonHadoopConfiguration(),
+                        new PaimonBucketAssignerFactory());
         Assertions.assertTrue(paimonSinkWriter.waitCompaction());
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
index 4b74575da8..f247a6461e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
@@ -19,19 +19,25 @@ package org.apache.seatunnel.e2e.connector.paimon;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogLoader;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.crosspartition.IndexBootstrap;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
@@ -41,29 +47,42 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
 
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static 
org.apache.seatunnel.e2e.common.container.AbstractTestContainer.HOST_VOLUME_MOUNT_PATH;
 import static org.awaitility.Awaitility.given;
@@ -81,7 +100,49 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
 
     private Map<String, Object> PAIMON_SINK_PROPERTIES;
 
-    @BeforeEach
+    private static final String MYSQL_DATABASE = "bucket";
+    private static final String SOURCE_TABLE = "test_dynamic_bucket";
+
+    private static final String MYSQL_HOST = "mysql_e2e";
+    private static final String MYSQL_USER_NAME = "mysqluser";
+    private static final String MYSQL_USER_PASSWORD = "mysqlpw";
+
+    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase bucketDatabase =
+            new UniqueDatabase(
+                    MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", 
MYSQL_DATABASE);
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        return new MySqlContainer(version)
+                .withConfigurationOverride("docker/server-gtids/my.cnf")
+                .withSetupSQL("docker/setup.sql")
+                .withNetwork(NETWORK)
+                .withNetworkAliases(MYSQL_HOST)
+                .withDatabaseName(MYSQL_DATABASE)
+                .withUsername(MYSQL_USER_NAME)
+                .withPassword(MYSQL_USER_PASSWORD)
+                .withLogConsumer(
+                        new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image")));
+    }
+
+    private String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";;
+    }
+
+    @TestContainerExtension
+    protected final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/MySQL-CDC/lib 
&& cd /tmp/seatunnel/plugins/MySQL-CDC/lib && wget "
+                                        + driverUrl());
+                Assertions.assertEquals(0, extraCommands.getExitCode(), 
extraCommands.getStderr());
+            };
+
+    @BeforeAll
     @Override
     public void startUp() throws Exception {
         this.isWindows =
@@ -102,11 +163,20 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
         paimonHadoopConf.put("dfs.client.use.datanode.hostname", "true");
         map.put("paimon.hadoop.conf", paimonHadoopConf);
         this.PAIMON_SINK_PROPERTIES = map;
+        log.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        log.info("Mysql Containers are started");
+        bucketDatabase.createAndInitialize();
+        log.info("Mysql ddl execution is complete");
     }
 
-    @AfterEach
+    @AfterAll
     @Override
-    public void tearDown() throws Exception {}
+    public void tearDown() throws Exception {
+        if (MYSQL_CONTAINER != null) {
+            MYSQL_CONTAINER.close();
+        }
+    }
 
     @TestTemplate
     public void testWriteAndReadPaimon(TestContainer container)
@@ -121,6 +191,93 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
         Assertions.assertEquals(0, readProjectionResult.getExitCode());
     }
 
+    @TestTemplate
+    public void testWriteForDifferentParallelism(TestContainer container)
+            throws IOException, InterruptedException, SQLException {
+        // parallelism = 3
+        Container.ExecResult textWriteResult1 =
+                
container.executeJob("/mysql_jdbc_to_dynamic_bucket_paimon_case1.conf");
+        Assertions.assertEquals(0, textWriteResult1.getExitCode());
+        try (Connection jdbcConnection = bucketDatabase.getJdbcConnection();
+                Statement statement = jdbcConnection.createStatement()) {
+            statement.executeUpdate(
+                    "update bucket.test_dynamic_bucket set version = '2' where 
id <= 102");
+            statement.executeUpdate(
+                    "update bucket.test_dynamic_bucket set version = '3' where 
id = 105");
+            statement.executeUpdate(
+                    "update bucket.test_dynamic_bucket set version = '4' where 
id = 109");
+        }
+        // parallelism = 1
+        Container.ExecResult textWriteResult2 =
+                
container.executeJob("/mysql_jdbc_to_dynamic_bucket_paimon_case2.conf");
+        Assertions.assertEquals(0, textWriteResult2.getExitCode());
+        List<String> parallelism_1 = verifyData(container);
+
+        // parallelism = 2
+        Container.ExecResult textWriteResult3 =
+                
container.executeJob("/mysql_jdbc_to_dynamic_bucket_paimon_case3.conf");
+        Assertions.assertEquals(0, textWriteResult3.getExitCode());
+
+        List<String> parallelism_2 = verifyData(container);
+        Assertions.assertEquals(parallelism_1, parallelism_2);
+    }
+
+    private List<String> verifyData(TestContainer container) {
+        List<InternalRow> actual = new ArrayList<>();
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(30L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            FileStoreTable table =
+                                    (FileStoreTable) 
getTable("mysql_to_paimon", SOURCE_TABLE);
+                            RowType rowType = table.rowType();
+                            String[] fields = new String[] {"id", "version"};
+                            int[] projection = getProjection(fields, rowType);
+                            DataType[] projectionDataTypes =
+                                    getProjectionFieldTypes(fields, rowType);
+                            ReadBuilder readBuilder =
+                                    
table.newReadBuilder().withProjection(projection);
+                            List<Split> splits = 
readBuilder.newScan().plan().splits();
+
+                            try (RecordReader<InternalRow> reader =
+                                    
readBuilder.newRead().executeFilter().createReader(splits)) {
+
+                                reader.forEachRemaining(
+                                        row -> {
+                                            GenericRow binaryRow =
+                                                    new 
GenericRow(projectionDataTypes.length);
+                                            for (int i = 0; i < 
projectionDataTypes.length; i++) {
+                                                DataType type = 
projectionDataTypes[i];
+                                                binaryRow.setField(
+                                                        i,
+                                                        
InternalRow.createFieldGetter(type, i)
+                                                                
.getFieldOrNull(row));
+                                            }
+                                            actual.add(binaryRow);
+                                        });
+                            }
+                            Assertions.assertEquals(10, actual.size());
+                        });
+        return 
actual.stream().map(Object::toString).collect(Collectors.toList());
+    }
+
+    private static DataType[] getProjectionFieldTypes(String[] projection, 
RowType rowType) {
+        List<String> fieldNames = rowType.getFieldNames();
+        Map<String, Integer> collect =
+                IntStream.range(0, fieldNames.size())
+                        .boxed()
+                        .collect(Collectors.toMap(fieldNames::get, 
Function.identity()));
+        return Arrays.stream(projection)
+                .map(field -> rowType.getTypeAt(collect.get(field)))
+                .toArray(DataType[]::new);
+    }
+
+    private int[] getProjection(String[] projection, RowType rowType) {
+        return 
Arrays.stream(projection).mapToInt(rowType::getFieldIndex).toArray();
+    }
+
     @TestTemplate
     public void testBucketCount(TestContainer container) throws IOException, 
InterruptedException {
         Container.ExecResult textWriteResult =
@@ -227,8 +384,11 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
     public void testCDCParallelismBucketCount(TestContainer container)
             throws IOException, InterruptedException {
         Container.ExecResult textWriteResult =
-                
container.executeJob("/fake_to_dynamic_bucket_paimon_case4.conf");
+                
container.executeJob("/fake_to_dynamic_bucket_paimon_case8.conf");
         Assertions.assertEquals(0, textWriteResult.getExitCode());
+        Container.ExecResult textWriteResult1 =
+                
container.executeJob("/fake_to_dynamic_bucket_paimon_case4.conf");
+        Assertions.assertEquals(0, textWriteResult1.getExitCode());
         given().ignoreExceptions()
                 .await()
                 .atLeast(100L, TimeUnit.MILLISECONDS)
@@ -258,7 +418,7 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
                                                             
Map.Entry::getValue,
                                                             HashMap::new,
                                                             
Collectors.counting()));
-                            Assertions.assertEquals(4, bucketCountMap.size());
+                            Assertions.assertEquals(2, bucketCountMap.size());
                             Assertions.assertEquals(5, bucketCountMap.get(0));
                         });
     }
@@ -392,7 +552,7 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
                                         intArray,
                                         row.getString(2),
                                         row.getBoolean(3),
-                                        row.getShort(4),
+                                        row.getByte(4),
                                         row.getShort(5),
                                         row.getInt(6),
                                         row.getLong(7),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/ddl/bucket.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/ddl/bucket.sql
new file mode 100644
index 0000000000..d4c61f122c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/ddl/bucket.sql
@@ -0,0 +1,47 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- 
----------------------------------------------------------------------------------------------------------------
+-- DATABASE:  shop
+-- 
----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE IF NOT EXISTS `bucket`;
+use bucket;
+
+drop table if exists test_dynamic_bucket;
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE test_dynamic_bucket (
+  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+  name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
+  description VARCHAR(512),
+  version VARCHAR(2)
+);
+
+
+INSERT INTO test_dynamic_bucket
+VALUES (101,"scooter","Small 2-wheel scooter",'1'),
+       (102,"car battery","12V car battery",'1'),
+       (103,"12-pack drill bits","12-pack of drill bits with sizes ranging 
from #40 to #3",'1'),
+       (104,"hammer","12oz carpenter's hammer",'1'),
+       (105,"hammer","14oz carpenter's hammer",'1'),
+       (106,"zhang","16oz carpenter's hammer",'1'),
+       (107,"rocks","box of assorted rocks",'1'),
+       (108,"jacket","water resistent black wind breaker",'1'),
+       (109,"hawk","water resistent black wind breaker",'1'),
+       (110,"spare tire","24 inch spare tire",'1');
+
+
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
index 993543effd..7d5dc77b46 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
@@ -28,6 +28,8 @@ source {
     auto.increment.enabled = true
     auto.increment.start = 1
     row.num = 100000
+    auto.increment.enabled = true
+    auto.increment.start = 1
     schema = {
       fields {
         pk_id = bigint
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
index 48ed12b65f..ae6ffadd80 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
@@ -28,6 +28,8 @@ source {
     auto.increment.enabled = true
     auto.increment.start = 1
     row.num = 100000
+    auto.increment.enabled = true
+    auto.increment.start = 1
     schema = {
       fields {
         pk_id = bigint
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
index 0270b7de36..33272b0093 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
@@ -64,6 +64,18 @@ source {
       {
         kind = INSERT
         fields = [7, "G", 100]
+      },
+      {
+        kind = INSERT
+        fields = [8, "H", 100]
+      },
+      {
+        kind = INSERT
+        fields = [9, "I", 100]
+      },
+      {
+        kind = INSERT
+        fields = [10, "J", 100]
       }
     ]
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
index 92df043c7d..862ce20365 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
@@ -28,6 +28,8 @@ source {
     auto.increment.enabled = true
     auto.increment.start = 1000000
     row.num = 100000
+    auto.increment.enabled = true
+    auto.increment.start = 1
     schema = {
       fields {
         pk_id = bigint
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case8.conf
similarity index 79%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case8.conf
index 0270b7de36..4aaf49dff2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case8.conf
@@ -19,7 +19,7 @@
 ######
 
 env {
-  parallelism = 2
+  parallelism = 1
   job.mode = "BATCH"
 }
 
@@ -64,6 +64,18 @@ source {
       {
         kind = INSERT
         fields = [7, "G", 100]
+      },
+      {
+        kind = INSERT
+        fields = [8, "H", 100]
+      },
+      {
+        kind = INSERT
+        fields = [9, "I", 100]
+      },
+      {
+        kind = INSERT
+        fields = [10, "J", 100]
       }
     ]
   }
@@ -71,12 +83,12 @@ source {
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/seatunnel_mnt/paimon"
-    database = "default"
-    table = "st_test_4"
-    paimon.table.write-props = {
-      bucket = -1
-      dynamic-bucket.target-row-num = 5
+      warehouse = "file:///tmp/seatunnel_mnt/paimon"
+      database = "default"
+      table = "st_test_4"
+      paimon.table.write-props = {
+         bucket = -1
+         dynamic-bucket.target-row-num = 5
+      }
     }
-  }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case1.conf
similarity index 70%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case1.conf
index 48ed12b65f..a1e41b351f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case1.conf
@@ -19,37 +19,28 @@
 ######
 
 env {
+  # You can set engine configuration here
   parallelism = 3
   job.mode = "BATCH"
 }
 
 source {
-  FakeSource {
-    auto.increment.enabled = true
-    auto.increment.start = 1
-    row.num = 100000
-    schema = {
-      fields {
-        pk_id = bigint
-        name = string
-        score = int
-      }
-      primaryKey {
-        name = "pk_id"
-        columnNames = [pk_id]
-      }
-    }
+  jdbc {
+    url = "jdbc:mysql://mysql_e2e:3306/bucket"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "st_user_source"
+    password = "mysqlpw"
+    table_path = "bucket.test_dynamic_bucket"
   }
 }
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/seatunnel_mnt/paimon"
-    database = "default"
-    table = "st_test_3"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
+    database = "mysql_to_paimon"
+    table = "test_dynamic_bucket"
     paimon.table.write-props = {
       bucket = -1
-      dynamic-bucket.target-row-num = 50000
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case2.conf
similarity index 70%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case2.conf
index 993543effd..22f7183b9c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case2.conf
@@ -19,37 +19,28 @@
 ######
 
 env {
+  # You can set engine configuration here
   parallelism = 1
   job.mode = "BATCH"
 }
 
 source {
-  FakeSource {
-    auto.increment.enabled = true
-    auto.increment.start = 1
-    row.num = 100000
-    schema = {
-      fields {
-        pk_id = bigint
-        name = string
-        score = int
-      }
-      primaryKey {
-        name = "pk_id"
-        columnNames = [pk_id]
-      }
-    }
+  jdbc {
+    url = "jdbc:mysql://mysql_e2e:3306/bucket"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "st_user_source"
+    password = "mysqlpw"
+    table_path = "bucket.test_dynamic_bucket"
   }
 }
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/seatunnel_mnt/paimon"
-    database = "default"
-    table = "st_test_2"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
+    database = "mysql_to_paimon"
+    table = "test_dynamic_bucket"
     paimon.table.write-props = {
       bucket = -1
-      dynamic-bucket.target-row-num = 50000
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case3.conf
similarity index 70%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case3.conf
index 993543effd..22f7183b9c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case3.conf
@@ -19,37 +19,28 @@
 ######
 
 env {
+  # You can set engine configuration here
   parallelism = 1
   job.mode = "BATCH"
 }
 
 source {
-  FakeSource {
-    auto.increment.enabled = true
-    auto.increment.start = 1
-    row.num = 100000
-    schema = {
-      fields {
-        pk_id = bigint
-        name = string
-        score = int
-      }
-      primaryKey {
-        name = "pk_id"
-        columnNames = [pk_id]
-      }
-    }
+  jdbc {
+    url = "jdbc:mysql://mysql_e2e:3306/bucket"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "st_user_source"
+    password = "mysqlpw"
+    table_path = "bucket.test_dynamic_bucket"
   }
 }
 
 sink {
   Paimon {
-    warehouse = "file:///tmp/seatunnel_mnt/paimon"
-    database = "default"
-    table = "st_test_2"
+    warehouse = "/tmp/seatunnel_mnt/paimon"
+    database = "mysql_to_paimon"
+    table = "test_dynamic_bucket"
     paimon.table.write-props = {
       bucket = -1
-      dynamic-bucket.target-row-num = 50000
     }
   }
-}
+}
\ No newline at end of file

Reply via email to