This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d29a531a48 [Fix][Connectors-v2] fix dynamic bucket for paimon sink
(#9595)
d29a531a48 is described below
commit d29a531a4831a441c872a22d9d26982b85f502ab
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Jul 31 16:30:21 2025 +0800
[Fix][Connectors-v2] fix dynamic bucket for paimon sink (#9595)
---
docs/en/connector-v2/sink/Paimon.md | 4 +
docs/zh/connector-v2/sink/Paimon.md | 4 +
.../seatunnel/paimon/config/PaimonSinkConfig.java | 7 -
.../seatunnel/paimon/sink/PaimonSink.java | 11 +-
.../seatunnel/paimon/sink/PaimonSinkWriter.java | 80 ++++++++--
.../sink/bucket/FixedBucketRowKeyExtractor.java | 72 +++++++++
.../paimon/sink/bucket/PaimonBucketAssigner.java | 87 ++++------
.../sink/bucket/PaimonBucketAssignerFactory.java | 66 ++++++++
.../sink/bucket/RowAssignerChannelComputer.java | 60 +++++++
.../paimon/sink/writer/PaimonWriteTest.java | 13 +-
.../paimon/PaimonSinkDynamicBucketIT.java | 176 ++++++++++++++++++++-
.../src/test/resources/ddl/bucket.sql | 47 ++++++
.../fake_to_dynamic_bucket_paimon_case2.conf | 2 +
.../fake_to_dynamic_bucket_paimon_case3.conf | 2 +
.../fake_to_dynamic_bucket_paimon_case4.conf | 12 ++
.../fake_to_dynamic_bucket_paimon_case5.conf | 2 +
...nf => fake_to_dynamic_bucket_paimon_case8.conf} | 28 +++-
...mysql_jdbc_to_dynamic_bucket_paimon_case1.conf} | 31 ++--
...mysql_jdbc_to_dynamic_bucket_paimon_case2.conf} | 31 ++--
...mysql_jdbc_to_dynamic_bucket_paimon_case3.conf} | 31 ++--
20 files changed, 602 insertions(+), 164 deletions(-)
diff --git a/docs/en/connector-v2/sink/Paimon.md
b/docs/en/connector-v2/sink/Paimon.md
index 8491b880b9..9f126895c9 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -448,6 +448,10 @@ sink {
Single dynamic bucket table with write props of paimon,operates on the primary
key table and bucket is -1.
+> Notes:
+> - Currently only the ordinary dynamic bucket mode is supported (the primary
key must include all partition fields).
+> - When running in a cluster environment, `parallelism` must be set to `1`;
otherwise, data duplication may occur.
+
#### core options
Please
[reference](https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket)
diff --git a/docs/zh/connector-v2/sink/Paimon.md
b/docs/zh/connector-v2/sink/Paimon.md
index f8aa5e330b..a386c903dc 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -444,6 +444,10 @@ sink {
只有在主键表并指定bucket = -1时才会生效
+> 注意:
+> - 目前只支持普通动态桶模式(主键包含所以分区字段)。
+> - 在集群环境下运行时`parallelism`必须为`1`, 否则可能存在数据重复问题。
+
####
核心参数:[参考官网](https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket)
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
index 857820b6a8..d54a65b80b 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -79,12 +79,5 @@ public class PaimonSinkConfig extends PaimonConfig {
this.changelogTmpPath =
writeProps.getOrDefault(
PaimonSinkOptions.CHANGELOG_TMP_PATH,
System.getProperty("java.io.tmpdir"));
- checkConfig();
- }
-
- private void checkConfig() {
- if (this.primaryKeys.isEmpty() &&
"-1".equals(this.writeProps.get("bucket"))) {
- log.warn("Append only table currently do not support dynamic
bucket");
- }
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index 0129438c83..44869f5793 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -36,6 +36,7 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfi
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.paimon.handler.PaimonSaveModeHandler;
import
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitter;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
@@ -75,11 +76,14 @@ public class PaimonSink
private final PaimonHadoopConfiguration paimonHadoopConfiguration;
+ private final PaimonBucketAssignerFactory paimonBucketAssignerFactory;
+
public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
this.readonlyConfig = readonlyConfig;
this.paimonSinkConfig = new PaimonSinkConfig(readonlyConfig);
this.catalogTable = catalogTable;
this.paimonHadoopConfiguration =
PaimonSecurityContext.loadHadoopConfig(paimonSinkConfig);
+ this.paimonBucketAssignerFactory = new PaimonBucketAssignerFactory();
}
@Override
@@ -96,7 +100,8 @@ public class PaimonSink
paimonTable,
jobContext,
paimonSinkConfig,
- paimonHadoopConfiguration);
+ paimonHadoopConfiguration,
+ paimonBucketAssignerFactory);
}
@Override
@@ -113,10 +118,10 @@ public class PaimonSink
readonlyConfig,
catalogTable,
paimonTable,
- states,
jobContext,
paimonSinkConfig,
- paimonHadoopConfiguration);
+ paimonHadoopConfiguration,
+ paimonBucketAssignerFactory);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 6ba1b877ef..08d60c0507 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -39,6 +39,8 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnecto
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssigner;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.RowAssignerChannelComputer;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.schema.handler.AlterPaimonTableSchemaEventHandler;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
@@ -63,9 +65,11 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -93,10 +97,10 @@ public class PaimonSinkWriter
private TableSchema sinkPaimonTableSchema;
- private PaimonBucketAssigner bucketAssigner;
-
private final boolean dynamicBucket;
+ private final PaimonBucketAssignerFactory paimonBucketAssignerFactory;
+
private final PaimonCatalog paimonCatalog;
private final TablePath paimonTablePath;
@@ -108,21 +112,30 @@ public class PaimonSinkWriter
private final JobContext jobContext;
+ private final RowAssignerChannelComputer rowAssignerChannelComputer;
+
+ private final int parallelism;
+
+ private final int taskIndex;
+
+ private final Set<PaimonBucketAssigner> bucketAssigners = new HashSet<>();
+
public PaimonSinkWriter(
Context context,
ReadonlyConfig readonlyConfig,
CatalogTable catalogTable,
- Table paimonFileStoretable,
+ Table paimonTable,
JobContext jobContext,
PaimonSinkConfig paimonSinkConfig,
- PaimonHadoopConfiguration paimonHadoopConfiguration) {
+ PaimonHadoopConfiguration paimonHadoopConfiguration,
+ PaimonBucketAssignerFactory paimonBucketAssignerFactory) {
this.sourceTableSchema = catalogTable.getTableSchema();
this.seaTunnelRowType = this.sourceTableSchema.toPhysicalRowDataType();
this.jobContext = jobContext;
this.paimonTablePath = catalogTable.getTablePath();
this.paimonCatalog = PaimonCatalog.loadPaimonCatalog(readonlyConfig);
this.paimonCatalog.open();
- this.paimonFileStoretable = (FileStoreTable) paimonFileStoretable;
+ this.paimonFileStoretable = (FileStoreTable) paimonTable;
CoreOptions.ChangelogProducer changelogProducer =
this.paimonFileStoretable.coreOptions().changelogProducer();
if (Objects.nonNull(paimonSinkConfig.getChangelogProducer())
@@ -130,22 +143,35 @@ public class PaimonSinkWriter
log.warn(
"configured the props named 'changelog-producer' which is
not compatible with the options in table , so it will use the table's
'changelog-producer'");
}
+ this.rowAssignerChannelComputer =
+ new RowAssignerChannelComputer(
+ paimonFileStoretable.schema(),
context.getNumberOfParallelSubtasks());
+
rowAssignerChannelComputer.setup(context.getNumberOfParallelSubtasks());
+ this.paimonBucketAssignerFactory = paimonBucketAssignerFactory;
+ this.parallelism = context.getNumberOfParallelSubtasks();
+ this.taskIndex = context.getIndexOfSubtask();
this.paimonSinkConfig = paimonSinkConfig;
this.sinkPaimonTableSchema = this.paimonFileStoretable.schema();
this.newTableWrite();
BucketMode bucketMode = this.paimonFileStoretable.bucketMode();
- this.dynamicBucket =
- BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC
== bucketMode;
- int bucket = ((FileStoreTable)
paimonFileStoretable).coreOptions().bucket();
+ //
https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket
+ // When you need cross partition upsert (primary keys not contain all
partition fields),
+ // Dynamic Bucket mode directly maintains the mapping of keys to
partition and bucket, uses
+ // local disks, and initializes indexes by reading all existing keys
in the table when
+ // starting job. For tables with a large amount of data, there will be
a significant loss in
+ // performance. Moreover, initialization takes a long time. This mode
is not supported at
+ // this time.
+ if (BucketMode.GLOBAL_DYNAMIC == bucketMode) {
+ throw new UnsupportedOperationException(
+ "Cross Partitions Upsert Dynamic Bucket Mode is not
supported.");
+ }
+ this.dynamicBucket = BucketMode.DYNAMIC == bucketMode;
+ int bucket = ((FileStoreTable) paimonTable).coreOptions().bucket();
if (bucket == -1 && BucketMode.UNAWARE == bucketMode) {
log.warn("Append only table currently do not support dynamic
bucket");
}
if (dynamicBucket) {
- this.bucketAssigner =
- new PaimonBucketAssigner(
- paimonFileStoretable,
- context.getNumberOfParallelSubtasks(),
- context.getIndexOfSubtask());
+ paimonBucketAssignerFactory.init(paimonTablePath,
paimonFileStoretable, parallelism);
}
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
}
@@ -158,7 +184,8 @@ public class PaimonSinkWriter
List<PaimonSinkState> states,
JobContext jobContext,
PaimonSinkConfig paimonSinkConfig,
- PaimonHadoopConfiguration paimonHadoopConfiguration) {
+ PaimonHadoopConfiguration paimonHadoopConfiguration,
+ PaimonBucketAssignerFactory paimonBucketAssignerFactory) {
this(
context,
readonlyConfig,
@@ -166,7 +193,8 @@ public class PaimonSinkWriter
paimonFileStoretable,
jobContext,
paimonSinkConfig,
- paimonHadoopConfiguration);
+ paimonHadoopConfiguration,
+ paimonBucketAssignerFactory);
if (Objects.isNull(states) || states.isEmpty()) {
return;
}
@@ -199,8 +227,20 @@ public class PaimonSinkWriter
PaimonSecurityContext.runSecured(
() -> {
if (dynamicBucket) {
- int bucket = bucketAssigner.assign(rowData);
- tableWrite.write(rowData, bucket);
+ // The result of calculating the remainder of the
parallelism using the
+ // hash code of the primary key must be consistent
with the task
+ // sequence number.
+ PaimonBucketAssigner bucketAssigner =
+
paimonBucketAssignerFactory.getBucketAssigner(
+ paimonTablePath,
+
rowAssignerChannelComputer.channel(rowData));
+ // When multiple threads call assigner.assign()
simultaneously, they can
+ // corrupt the internal hash map structure,
leading to the
+ // ArrayIndexOutOfBoundsException during rehashing
operations
+ synchronized (bucketAssigner) {
+ tableWrite.write(rowData,
bucketAssigner.assign(rowData));
+ bucketAssigners.add(bucketAssigner);
+ }
} else {
tableWrite.write(rowData);
}
@@ -256,6 +296,11 @@ public class PaimonSinkWriter
List<CommitMessage> fileCommittables =
((StreamTableWrite)
tableWrite).prepareCommit(waitCompaction(), checkpointId);
committables.addAll(fileCommittables);
+ if (!bucketAssigners.isEmpty()) {
+ List<PaimonBucketAssigner> assigners = new
ArrayList<>(bucketAssigners);
+ bucketAssigners.clear();
+ assigners.forEach(assigner ->
assigner.prepareCommit(checkpointId));
+ }
return Optional.of(new PaimonCommitInfo(fileCommittables,
checkpointId));
} catch (Exception e) {
throw new PaimonConnectorException(
@@ -282,6 +327,7 @@ public class PaimonSinkWriter
tableWriteClose(this.tableWrite);
} finally {
committables.clear();
+ paimonBucketAssignerFactory.clear(paimonTablePath, taskIndex);
if (Objects.nonNull(paimonCatalog)) {
paimonCatalog.close();
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/FixedBucketRowKeyExtractor.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/FixedBucketRowKeyExtractor.java
new file mode 100644
index 0000000000..8a2a453cb8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/FixedBucketRowKeyExtractor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+import org.apache.paimon.table.sink.RowKeyExtractor;
+
+public class FixedBucketRowKeyExtractor extends RowKeyExtractor {
+ private final int numBuckets;
+ private final boolean sameBucketKeyAndTrimmedPrimaryKey;
+ private final Projection bucketKeyProjection;
+ private BinaryRow reuseBucketKey;
+ private Integer reuseBucket;
+
+ public FixedBucketRowKeyExtractor(TableSchema schema) {
+ super(schema);
+ this.numBuckets = (new CoreOptions(schema.options())).bucket();
+ this.sameBucketKeyAndTrimmedPrimaryKey =
+ schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
+ this.bucketKeyProjection =
+ CodeGenUtils.newProjection(
+ schema.logicalRowType(),
schema.projection(schema.bucketKeys()));
+ }
+
+ public void setRecord(InternalRow record) {
+ super.setRecord(record);
+ this.reuseBucketKey = null;
+ this.reuseBucket = null;
+ }
+
+ private BinaryRow bucketKey() {
+ if (this.sameBucketKeyAndTrimmedPrimaryKey) {
+ return this.trimmedPrimaryKey();
+ } else {
+ if (this.reuseBucketKey == null) {
+ this.reuseBucketKey =
this.bucketKeyProjection.apply(this.record);
+ }
+ return this.reuseBucketKey;
+ }
+ }
+
+ public int bucket() {
+ BinaryRow bucketKey = this.bucketKey();
+ if (this.reuseBucket == null) {
+ this.reuseBucket =
+ KeyAndBucketExtractor.bucket(
+
KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), this.numBuckets);
+ }
+
+ return this.reuseBucket;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
index 16804754fa..f7919fc4b4 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
@@ -17,77 +17,50 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
-import org.apache.paimon.crosspartition.IndexBootstrap;
-import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.index.SimpleHashBucketAssigner;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.reader.RecordReaderIterator;
-import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.index.HashBucketAssigner;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
public class PaimonBucketAssigner {
- private final RowPartitionKeyExtractor extractor;
+ private boolean isRunning;
- private final SimpleHashBucketAssigner simpleHashBucketAssigner;
+ private final FixedBucketRowKeyExtractor extractor;
- private final TableSchema schema;
+ private final HashBucketAssigner hashBucketAssigner;
public PaimonBucketAssigner(Table table, int numAssigners, int assignId) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
- this.schema = fileStoreTable.schema();
- this.extractor = new RowPartitionKeyExtractor(fileStoreTable.schema());
- long dynamicBucketTargetRowNum =
- ((FileStoreTable)
table).coreOptions().dynamicBucketTargetRowNum();
- this.simpleHashBucketAssigner =
- new SimpleHashBucketAssigner(numAssigners, assignId,
dynamicBucketTargetRowNum);
- loadBucketIndex(fileStoreTable, numAssigners, assignId);
+ this.extractor = new
FixedBucketRowKeyExtractor(fileStoreTable.schema());
+ long dynamicBucketTargetRowNum =
fileStoreTable.coreOptions().dynamicBucketTargetRowNum();
+ this.hashBucketAssigner =
+ new HashBucketAssigner(
+ fileStoreTable.snapshotManager(),
+ "hash-bucket",
+ fileStoreTable.store().newIndexFileHandler(),
+ numAssigners,
+ numAssigners,
+ assignId,
+ dynamicBucketTargetRowNum);
+ this.isRunning = true;
}
- private void loadBucketIndex(FileStoreTable fileStoreTable, int
numAssigners, int assignId) {
- IndexBootstrap indexBootstrap = new IndexBootstrap(fileStoreTable);
- List<String> fieldNames = schema.fieldNames();
- Map<String, Integer> fieldIndexMap =
- IntStream.range(0, fieldNames.size())
- .boxed()
- .collect(Collectors.toMap(fieldNames::get,
Function.identity()));
- List<DataField> primaryKeys = schema.primaryKeysFields();
- try (RecordReader<InternalRow> recordReader =
- indexBootstrap.bootstrap(numAssigners, assignId)) {
- RecordReaderIterator<InternalRow> readerIterator =
- new RecordReaderIterator<>(recordReader);
- while (readerIterator.hasNext()) {
- InternalRow row = readerIterator.next();
- GenericRow binaryRow = new GenericRow(fieldNames.size());
- for (int i = 0; i < primaryKeys.size(); i++) {
- String name = primaryKeys.get(i).name();
- DataType type = primaryKeys.get(i).type();
- binaryRow.setField(
- fieldIndexMap.get(name),
- InternalRow.createFieldGetter(type,
i).getFieldOrNull(row));
- }
- assign(binaryRow);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ public int assign(InternalRow rowData) {
+ extractor.setRecord(rowData);
+ return hashBucketAssigner.assign(
+ extractor.partition(),
extractor.trimmedPrimaryKey().hashCode());
}
- public int assign(InternalRow rowData) {
- int hash = extractor.trimmedPrimaryKey(rowData).hashCode();
- return Math.abs(
-
this.simpleHashBucketAssigner.assign(this.extractor.partition(rowData), hash));
+ public void prepareCommit(long commitIdentifier) {
+ hashBucketAssigner.prepareCommit(commitIdentifier);
+ }
+
+ public void finish() {
+ this.isRunning = false;
+ }
+
+ public boolean isRunning() {
+ return isRunning;
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerFactory.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerFactory.java
new file mode 100644
index 0000000000..f1c42f8e75
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import org.apache.paimon.table.Table;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PaimonBucketAssignerFactory implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final ConcurrentHashMap<TablePath, Map<Integer,
PaimonBucketAssigner>>
+ bucketAssignerMap = new ConcurrentHashMap<>();
+
+ public PaimonBucketAssignerFactory() {}
+
+ public void init(final TablePath tableId, final Table table, final int
numAssigners) {
+ bucketAssignerMap.computeIfAbsent(
+ tableId,
+ t -> {
+ Map<Integer, PaimonBucketAssigner> map = new
ConcurrentHashMap<>();
+ for (int i = 0; i < numAssigners; i++) {
+ map.put(i, new PaimonBucketAssigner(table,
numAssigners, i));
+ }
+ return map;
+ });
+ }
+
+ public PaimonBucketAssigner getBucketAssigner(final TablePath tableId,
final int assignId) {
+ return bucketAssignerMap.get(tableId).get(assignId);
+ }
+
+ public void clear(final TablePath tableId, final int assignId) {
+ if (bucketAssignerMap.containsKey(tableId)) {
+ Map<Integer, PaimonBucketAssigner> paimonBucketAssignerMap =
+ bucketAssignerMap.get(tableId);
+ boolean isRunning =
+ paimonBucketAssignerMap.values().stream()
+ .anyMatch(PaimonBucketAssigner::isRunning);
+ if (!isRunning) {
+ bucketAssignerMap.remove(tableId);
+ } else {
+ paimonBucketAssignerMap.get(assignId).finish();
+ }
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/RowAssignerChannelComputer.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/RowAssignerChannelComputer.java
new file mode 100644
index 0000000000..0e0186935c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/RowAssignerChannelComputer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.utils.MathUtils;
+
+import static org.apache.paimon.index.BucketAssigner.computeAssigner;
+
+public class RowAssignerChannelComputer implements
ChannelComputer<InternalRow> {
+ private static final long serialVersionUID = 1L;
+
+ private final TableSchema schema;
+ private Integer numAssigners;
+
+ private transient int numChannels;
+ private transient RowPartitionKeyExtractor extractor;
+
+ public RowAssignerChannelComputer(TableSchema schema, Integer
numAssigners) {
+ this.schema = schema;
+ this.numAssigners = numAssigners;
+ }
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ this.numAssigners = MathUtils.min(numAssigners, numChannels);
+ this.extractor = new RowPartitionKeyExtractor(schema);
+ }
+
+ @Override
+ public int channel(InternalRow record) {
+ int partitionHash = extractor.partition(record).hashCode();
+ int keyHash = extractor.trimmedPrimaryKey(record).hashCode();
+ return computeAssigner(partitionHash, keyHash, numChannels,
numAssigners);
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by key hash";
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
index f0cb816f31..6c10c7f23d 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -238,7 +239,8 @@ public class PaimonWriteTest {
paimonCatalog.getPaimonTable(tablePath),
jobContext,
new PaimonSinkConfig(readonlyConfig),
- new PaimonHadoopConfiguration());
+ new PaimonHadoopConfiguration(),
+ new PaimonBucketAssignerFactory());
Assertions.assertFalse(paimonSinkWriter.waitCompaction());
jobContext.setJobMode(JobMode.BATCH);
@@ -250,7 +252,8 @@ public class PaimonWriteTest {
paimonCatalog.getPaimonTable(tablePath),
jobContext,
new PaimonSinkConfig(readonlyConfig),
- new PaimonHadoopConfiguration());
+ new PaimonHadoopConfiguration(),
+ new PaimonBucketAssignerFactory());
Assertions.assertTrue(paimonSinkWriter.waitCompaction());
Map<String, Object> properties = new HashMap<>();
@@ -270,7 +273,8 @@ public class PaimonWriteTest {
paimonCatalog.getPaimonTable(tablePath),
jobContext,
new PaimonSinkConfig(readonlyConfig),
- new PaimonHadoopConfiguration());
+ new PaimonHadoopConfiguration(),
+ new PaimonBucketAssignerFactory());
Assertions.assertTrue(paimonSinkWriter.waitCompaction());
writeProps.put("changelog-producer", "full-compaction");
@@ -283,7 +287,8 @@ public class PaimonWriteTest {
paimonCatalog.getPaimonTable(tablePath),
jobContext,
new PaimonSinkConfig(readonlyConfig),
- new PaimonHadoopConfiguration());
+ new PaimonHadoopConfiguration(),
+ new PaimonBucketAssignerFactory());
Assertions.assertTrue(paimonSinkWriter.waitCompaction());
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
index 4b74575da8..f247a6461e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
@@ -19,19 +19,25 @@ package org.apache.seatunnel.e2e.connector.paimon;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.utils.SeaTunnelException;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
import
org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogLoader;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.crosspartition.IndexBootstrap;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
@@ -41,29 +47,42 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import static
org.apache.seatunnel.e2e.common.container.AbstractTestContainer.HOST_VOLUME_MOUNT_PATH;
import static org.awaitility.Awaitility.given;
@@ -81,7 +100,49 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
private Map<String, Object> PAIMON_SINK_PROPERTIES;
- @BeforeEach
+ private static final String MYSQL_DATABASE = "bucket";
+ private static final String SOURCE_TABLE = "test_dynamic_bucket";
+
+ private static final String MYSQL_HOST = "mysql_e2e";
+ private static final String MYSQL_USER_NAME = "mysqluser";
+ private static final String MYSQL_USER_PASSWORD = "mysqlpw";
+
+ private static final MySqlContainer MYSQL_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase bucketDatabase =
+ new UniqueDatabase(
+ MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw",
MYSQL_DATABASE);
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ return new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image")));
+ }
+
+ private String driverUrl() {
+ return
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
+ }
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/MySQL-CDC/lib
&& cd /tmp/seatunnel/plugins/MySQL-CDC/lib && wget "
+ + driverUrl());
+ Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
+ };
+
+ @BeforeAll
@Override
public void startUp() throws Exception {
this.isWindows =
@@ -102,11 +163,20 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
paimonHadoopConf.put("dfs.client.use.datanode.hostname", "true");
map.put("paimon.hadoop.conf", paimonHadoopConf);
this.PAIMON_SINK_PROPERTIES = map;
+ log.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ log.info("Mysql Containers are started");
+ bucketDatabase.createAndInitialize();
+ log.info("Mysql ddl execution is complete");
}
- @AfterEach
+ @AfterAll
@Override
- public void tearDown() throws Exception {}
+ public void tearDown() throws Exception {
+ if (MYSQL_CONTAINER != null) {
+ MYSQL_CONTAINER.close();
+ }
+ }
@TestTemplate
public void testWriteAndReadPaimon(TestContainer container)
@@ -121,6 +191,93 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
Assertions.assertEquals(0, readProjectionResult.getExitCode());
}
+ @TestTemplate
+ public void testWriteForDifferentParallelism(TestContainer container)
+ throws IOException, InterruptedException, SQLException {
+ // parallelism = 3
+ Container.ExecResult textWriteResult1 =
+
container.executeJob("/mysql_jdbc_to_dynamic_bucket_paimon_case1.conf");
+ Assertions.assertEquals(0, textWriteResult1.getExitCode());
+ try (Connection jdbcConnection = bucketDatabase.getJdbcConnection();
+ Statement statement = jdbcConnection.createStatement()) {
+ statement.executeUpdate(
+ "update bucket.test_dynamic_bucket set version = '2' where
id <= 102");
+ statement.executeUpdate(
+ "update bucket.test_dynamic_bucket set version = '3' where
id = 105");
+ statement.executeUpdate(
+ "update bucket.test_dynamic_bucket set version = '4' where
id = 109");
+ }
+ // parallelism = 1
+ Container.ExecResult textWriteResult2 =
+
container.executeJob("/mysql_jdbc_to_dynamic_bucket_paimon_case2.conf");
+ Assertions.assertEquals(0, textWriteResult2.getExitCode());
+ List<String> parallelism_1 = verifyData(container);
+
+ // parallelism = 2
+ Container.ExecResult textWriteResult3 =
+
container.executeJob("/mysql_jdbc_to_dynamic_bucket_paimon_case3.conf");
+ Assertions.assertEquals(0, textWriteResult3.getExitCode());
+
+ List<String> parallelism_2 = verifyData(container);
+ Assertions.assertEquals(parallelism_1, parallelism_2);
+ }
+
+ private List<String> verifyData(TestContainer container) {
+ List<InternalRow> actual = new ArrayList<>();
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ FileStoreTable table =
+ (FileStoreTable)
getTable("mysql_to_paimon", SOURCE_TABLE);
+ RowType rowType = table.rowType();
+ String[] fields = new String[] {"id", "version"};
+ int[] projection = getProjection(fields, rowType);
+ DataType[] projectionDataTypes =
+ getProjectionFieldTypes(fields, rowType);
+ ReadBuilder readBuilder =
+
table.newReadBuilder().withProjection(projection);
+ List<Split> splits =
readBuilder.newScan().plan().splits();
+
+ try (RecordReader<InternalRow> reader =
+
readBuilder.newRead().executeFilter().createReader(splits)) {
+
+ reader.forEachRemaining(
+ row -> {
+ GenericRow binaryRow =
+ new
GenericRow(projectionDataTypes.length);
+ for (int i = 0; i <
projectionDataTypes.length; i++) {
+ DataType type =
projectionDataTypes[i];
+ binaryRow.setField(
+ i,
+
InternalRow.createFieldGetter(type, i)
+
.getFieldOrNull(row));
+ }
+ actual.add(binaryRow);
+ });
+ }
+ Assertions.assertEquals(10, actual.size());
+ });
+ return
actual.stream().map(Object::toString).collect(Collectors.toList());
+ }
+
+ private static DataType[] getProjectionFieldTypes(String[] projection,
RowType rowType) {
+ List<String> fieldNames = rowType.getFieldNames();
+ Map<String, Integer> collect =
+ IntStream.range(0, fieldNames.size())
+ .boxed()
+ .collect(Collectors.toMap(fieldNames::get,
Function.identity()));
+ return Arrays.stream(projection)
+ .map(field -> rowType.getTypeAt(collect.get(field)))
+ .toArray(DataType[]::new);
+ }
+
+ private int[] getProjection(String[] projection, RowType rowType) {
+ return
Arrays.stream(projection).mapToInt(rowType::getFieldIndex).toArray();
+ }
+
@TestTemplate
public void testBucketCount(TestContainer container) throws IOException,
InterruptedException {
Container.ExecResult textWriteResult =
@@ -227,8 +384,11 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
public void testCDCParallelismBucketCount(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult textWriteResult =
-
container.executeJob("/fake_to_dynamic_bucket_paimon_case4.conf");
+
container.executeJob("/fake_to_dynamic_bucket_paimon_case8.conf");
Assertions.assertEquals(0, textWriteResult.getExitCode());
+ Container.ExecResult textWriteResult1 =
+
container.executeJob("/fake_to_dynamic_bucket_paimon_case4.conf");
+ Assertions.assertEquals(0, textWriteResult1.getExitCode());
given().ignoreExceptions()
.await()
.atLeast(100L, TimeUnit.MILLISECONDS)
@@ -258,7 +418,7 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
Map.Entry::getValue,
HashMap::new,
Collectors.counting()));
- Assertions.assertEquals(4, bucketCountMap.size());
+ Assertions.assertEquals(2, bucketCountMap.size());
Assertions.assertEquals(5, bucketCountMap.get(0));
});
}
@@ -392,7 +552,7 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
intArray,
row.getString(2),
row.getBoolean(3),
- row.getShort(4),
+ row.getByte(4),
row.getShort(5),
row.getInt(6),
row.getLong(7),
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/ddl/bucket.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/ddl/bucket.sql
new file mode 100644
index 0000000000..d4c61f122c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/ddl/bucket.sql
@@ -0,0 +1,47 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: shop
+--
----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE IF NOT EXISTS `bucket`;
+use bucket;
+
+drop table if exists test_dynamic_bucket;
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE test_dynamic_bucket (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
+ description VARCHAR(512),
+ version VARCHAR(2)
+);
+
+
+INSERT INTO test_dynamic_bucket
+VALUES (101,"scooter","Small 2-wheel scooter",'1'),
+ (102,"car battery","12V car battery",'1'),
+ (103,"12-pack drill bits","12-pack of drill bits with sizes ranging
from #40 to #3",'1'),
+ (104,"hammer","12oz carpenter's hammer",'1'),
+ (105,"hammer","14oz carpenter's hammer",'1'),
+ (106,"zhang","16oz carpenter's hammer",'1'),
+ (107,"rocks","box of assorted rocks",'1'),
+ (108,"jacket","water resistent black wind breaker",'1'),
+ (109,"hawk","water resistent black wind breaker",'1'),
+ (110,"spare tire","24 inch spare tire",'1');
+
+
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
index 993543effd..7d5dc77b46 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
@@ -28,6 +28,8 @@ source {
auto.increment.enabled = true
auto.increment.start = 1
row.num = 100000
+ auto.increment.enabled = true
+ auto.increment.start = 1
schema = {
fields {
pk_id = bigint
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
index 48ed12b65f..ae6ffadd80 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
@@ -28,6 +28,8 @@ source {
auto.increment.enabled = true
auto.increment.start = 1
row.num = 100000
+ auto.increment.enabled = true
+ auto.increment.start = 1
schema = {
fields {
pk_id = bigint
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
index 0270b7de36..33272b0093 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
@@ -64,6 +64,18 @@ source {
{
kind = INSERT
fields = [7, "G", 100]
+ },
+ {
+ kind = INSERT
+ fields = [8, "H", 100]
+ },
+ {
+ kind = INSERT
+ fields = [9, "I", 100]
+ },
+ {
+ kind = INSERT
+ fields = [10, "J", 100]
}
]
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
index 92df043c7d..862ce20365 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
@@ -28,6 +28,8 @@ source {
auto.increment.enabled = true
auto.increment.start = 1000000
row.num = 100000
+ auto.increment.enabled = true
+ auto.increment.start = 1
schema = {
fields {
pk_id = bigint
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case8.conf
similarity index 79%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case8.conf
index 0270b7de36..4aaf49dff2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case8.conf
@@ -19,7 +19,7 @@
######
env {
- parallelism = 2
+ parallelism = 1
job.mode = "BATCH"
}
@@ -64,6 +64,18 @@ source {
{
kind = INSERT
fields = [7, "G", 100]
+ },
+ {
+ kind = INSERT
+ fields = [8, "H", 100]
+ },
+ {
+ kind = INSERT
+ fields = [9, "I", 100]
+ },
+ {
+ kind = INSERT
+ fields = [10, "J", 100]
}
]
}
@@ -71,12 +83,12 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/seatunnel_mnt/paimon"
- database = "default"
- table = "st_test_4"
- paimon.table.write-props = {
- bucket = -1
- dynamic-bucket.target-row-num = 5
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
+ database = "default"
+ table = "st_test_4"
+ paimon.table.write-props = {
+ bucket = -1
+ dynamic-bucket.target-row-num = 5
+ }
}
- }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case1.conf
similarity index 70%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case1.conf
index 48ed12b65f..a1e41b351f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case1.conf
@@ -19,37 +19,28 @@
######
env {
+ # You can set engine configuration here
parallelism = 3
job.mode = "BATCH"
}
source {
- FakeSource {
- auto.increment.enabled = true
- auto.increment.start = 1
- row.num = 100000
- schema = {
- fields {
- pk_id = bigint
- name = string
- score = int
- }
- primaryKey {
- name = "pk_id"
- columnNames = [pk_id]
- }
- }
+ jdbc {
+ url = "jdbc:mysql://mysql_e2e:3306/bucket"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "st_user_source"
+ password = "mysqlpw"
+ table_path = "bucket.test_dynamic_bucket"
}
}
sink {
Paimon {
- warehouse = "file:///tmp/seatunnel_mnt/paimon"
- database = "default"
- table = "st_test_3"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
+ database = "mysql_to_paimon"
+ table = "test_dynamic_bucket"
paimon.table.write-props = {
bucket = -1
- dynamic-bucket.target-row-num = 50000
}
}
-}
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case2.conf
similarity index 70%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case2.conf
index 993543effd..22f7183b9c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case2.conf
@@ -19,37 +19,28 @@
######
env {
+ # You can set engine configuration here
parallelism = 1
job.mode = "BATCH"
}
source {
- FakeSource {
- auto.increment.enabled = true
- auto.increment.start = 1
- row.num = 100000
- schema = {
- fields {
- pk_id = bigint
- name = string
- score = int
- }
- primaryKey {
- name = "pk_id"
- columnNames = [pk_id]
- }
- }
+ jdbc {
+ url = "jdbc:mysql://mysql_e2e:3306/bucket"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "st_user_source"
+ password = "mysqlpw"
+ table_path = "bucket.test_dynamic_bucket"
}
}
sink {
Paimon {
- warehouse = "file:///tmp/seatunnel_mnt/paimon"
- database = "default"
- table = "st_test_2"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
+ database = "mysql_to_paimon"
+ table = "test_dynamic_bucket"
paimon.table.write-props = {
bucket = -1
- dynamic-bucket.target-row-num = 50000
}
}
-}
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case3.conf
similarity index 70%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case3.conf
index 993543effd..22f7183b9c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_jdbc_to_dynamic_bucket_paimon_case3.conf
@@ -19,37 +19,28 @@
######
env {
+ # You can set engine configuration here
parallelism = 1
job.mode = "BATCH"
}
source {
- FakeSource {
- auto.increment.enabled = true
- auto.increment.start = 1
- row.num = 100000
- schema = {
- fields {
- pk_id = bigint
- name = string
- score = int
- }
- primaryKey {
- name = "pk_id"
- columnNames = [pk_id]
- }
- }
+ jdbc {
+ url = "jdbc:mysql://mysql_e2e:3306/bucket"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "st_user_source"
+ password = "mysqlpw"
+ table_path = "bucket.test_dynamic_bucket"
}
}
sink {
Paimon {
- warehouse = "file:///tmp/seatunnel_mnt/paimon"
- database = "default"
- table = "st_test_2"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
+ database = "mysql_to_paimon"
+ table = "test_dynamic_bucket"
paimon.table.write-props = {
bucket = -1
- dynamic-bucket.target-row-num = 50000
}
}
-}
+}
\ No newline at end of file