This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0b1c9a52c9 [Feature][Connector-V2] Add multi-table sink support for
AmazonDynamo… (#10497)
0b1c9a52c9 is described below
commit 0b1c9a52c99eb63fa0f6fc8a93d87b5c1ee89ec2
Author: Mohamed Talal Seif <[email protected]>
AuthorDate: Sun Mar 22 13:16:34 2026 +0200
[Feature][Connector-V2] Add multi-table sink support for AmazonDynamo…
(#10497)
---
docs/en/connectors/sink/AmazonDynamoDB.md | 42 +++++-
docs/zh/connectors/sink/AmazonDynamoDB.md | 42 +++++-
.../config/AmazonDynamoDBConfig.java | 12 ++
.../config/AmazonDynamoDBSinkOptions.java | 18 +++
.../amazondynamodb/sink/AmazonDynamoDBSink.java | 10 +-
.../sink/AmazonDynamoDBSinkFactory.java | 11 +-
.../amazondynamodb/sink/AmazonDynamoDBWriter.java | 36 ++++-
.../amazondynamodb/sink/DynamoDbSinkClient.java | 150 +++++++++++++++++----
.../AmazonDynamoDBMultiTableSinkTest.java | 43 ++++++
9 files changed, 322 insertions(+), 42 deletions(-)
diff --git a/docs/en/connectors/sink/AmazonDynamoDB.md
b/docs/en/connectors/sink/AmazonDynamoDB.md
index 4d7b05e15a..1bae54cdbc 100644
--- a/docs/en/connectors/sink/AmazonDynamoDB.md
+++ b/docs/en/connectors/sink/AmazonDynamoDB.md
@@ -11,6 +11,7 @@ Write data to Amazon DynamoDB
## Key Features
- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [x] [support multiple table
write](../../introduction/concepts/connector-v2-features.md)
## Options
@@ -21,8 +22,11 @@ Write data to Amazon DynamoDB
| access_key_id | string | yes | - |
| secret_access_key | string | yes | - |
| table | string | yes | - |
-| batch_size | string | no | 25 |
-| common-options | | no | - |
+| batch_size | int | no | 25 |
+| max_retries | int | no | 10 |
+| retry_base_delay_ms | long | no | 100 |
+| retry_max_delay_ms | long | no | 5000 |
+| common-options | | no | - |
### url [string]
@@ -42,7 +46,23 @@ The access secret of Amazon DynamoDB.
### table [string]
-The table of Amazon DynamoDB.
+The table of Amazon DynamoDB. Supports `${table_name}` placeholder for
multi-table sink scenarios.
+
+### batch_size [int]
+
+The number of records to batch before writing to Amazon DynamoDB.
+
+### max_retries [int]
+
+Maximum number of retries when DynamoDB returns unprocessed items in a batch
write.
+
+### retry_base_delay_ms [long]
+
+Base delay in milliseconds for exponential backoff between retries.
+
+### retry_max_delay_ms [long]
+
+Maximum delay in milliseconds between retries regardless of retry count.
### common options
@@ -50,14 +70,26 @@ Sink plugin common parameters, please refer to [Sink Common
Options](../common-o
## Example
+### Single table
```bash
-Amazondynamodb {
+AmazonDynamoDB {
url = "http://127.0.0.1:8000"
region = "us-east-1"
access_key_id = "dummy-key"
secret_access_key = "dummy-secret"
table = "TableName"
- }
+}
+```
+
+### Multiple table
+```bash
+AmazonDynamoDB {
+ url = "http://127.0.0.1:8000"
+ region = "us-east-1"
+ access_key_id = "dummy-key"
+ secret_access_key = "dummy-secret"
+ table = "${table_name}"
+}
```
## Changelog
diff --git a/docs/zh/connectors/sink/AmazonDynamoDB.md
b/docs/zh/connectors/sink/AmazonDynamoDB.md
index 3caff15298..d7084211d5 100644
--- a/docs/zh/connectors/sink/AmazonDynamoDB.md
+++ b/docs/zh/connectors/sink/AmazonDynamoDB.md
@@ -11,6 +11,7 @@ import ChangeLog from
'../changelog/connector-amazondynamodb.md';
## 关键特性
- [ ] [精确一次](../../introduction/concepts/connector-v2-features.md)
+- [x] [支持多表写入](../../introduction/concepts/connector-v2-features.md)
## 选项
@@ -21,8 +22,11 @@ import ChangeLog from
'../changelog/connector-amazondynamodb.md';
| access_key_id | string | 是 | - |
| secret_access_key | string | 是 | - |
| table | string | 是 | - |
-| batch_size | string | 否 | 25 |
-| common-options | | 否 | - |
+| batch_size | int | 否 | 25 |
+| max_retries | int | 否 | 10 |
+| retry_base_delay_ms | long | 否 | 100 |
+| retry_max_delay_ms | long | 否 | 5000 |
+| common-options | | 否 | - |
### url [string]
@@ -42,7 +46,23 @@ Amazon DynamoDB的访问密钥.
### table [string]
-Amazon DynamoDB 的表名.
+Amazon DynamoDB 的表名. 支持使用 `${table_name}` 占位符,用于多表写入场景.
+
+### batch_size [int]
+
+写入 Amazon DynamoDB 前批量缓存的记录数.
+
+### max_retries [int]
+
+当 DynamoDB 返回未处理数据时,批量写入请求的最大重试次数.
+
+### retry_base_delay_ms [long]
+
+重试之间指数退避的基础延迟时间(毫秒).
+
+### retry_max_delay_ms [long]
+
+重试之间的最大延迟时间(毫秒).
### 常见选项
@@ -50,14 +70,26 @@ Sink插件常用参数,请参考 [Sink Common Options](../common-options/sink-
## 示例
+### 单表写入
```bash
-Amazondynamodb {
+AmazonDynamoDB {
url = "http://127.0.0.1:8000"
region = "us-east-1"
access_key_id = "dummy-key"
secret_access_key = "dummy-secret"
table = "TableName"
- }
+}
+```
+
+### 多表写入
+```bash
+AmazonDynamoDB {
+ url = "http://127.0.0.1:8000"
+ region = "us-east-1"
+ access_key_id = "dummy-key"
+ secret_access_key = "dummy-secret"
+ table = "${table_name}"
+}
```
## 变更日志
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
index c1f46b5e40..d6554fe6e7 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
@@ -46,6 +46,9 @@ public class AmazonDynamoDBConfig implements Serializable {
public int batchSize;
public int scanItemLimit;
public int parallelScanThreads;
+ private int maxRetries;
+ private long retryBaseDelayMs;
+ private long retryMaxDelayMs;
public AmazonDynamoDBConfig(ReadonlyConfig config) {
this.url = config.get(AmazonDynamoDBBaseOptions.URL);
@@ -60,5 +63,14 @@ public class AmazonDynamoDBConfig implements Serializable {
this.batchSize = config.get(AmazonDynamoDBSinkOptions.BATCH_SIZE);
this.scanItemLimit =
config.get(AmazonDynamoDBSourceOptions.SCAN_ITEM_LIMIT);
this.parallelScanThreads =
config.get(AmazonDynamoDBSourceOptions.PARALLEL_SCAN_THREADS);
+ this.maxRetries = config.get(AmazonDynamoDBSinkOptions.MAX_RETRIES);
+ if (this.maxRetries < 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "max_retries must be a non-negative integer, but
got: %d",
+ this.maxRetries));
+ }
+ this.retryBaseDelayMs =
config.get(AmazonDynamoDBSinkOptions.RETRY_BASE_DELAY_MS);
+ this.retryMaxDelayMs =
config.get(AmazonDynamoDBSinkOptions.RETRY_MAX_DELAY_MS);
}
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
index 48aebf8be1..cdab19a5c2 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
@@ -27,4 +27,22 @@ public class AmazonDynamoDBSinkOptions extends
AmazonDynamoDBBaseOptions {
.intType()
.defaultValue(25)
.withDescription("The batch size of Amazon DynamoDB");
+
+ public static final Option<Integer> MAX_RETRIES =
+ Options.key("max_retries")
+ .intType()
+ .defaultValue(10)
+ .withDescription("Maximum number of retries for batch
write requests");
+
+ public static final Option<Long> RETRY_BASE_DELAY_MS =
+ Options.key("retry_base_delay_ms")
+ .longType()
+ .defaultValue(100L)
+ .withDescription("Base delay in milliseconds for
exponential backoff");
+
+ public static final Option<Long> RETRY_MAX_DELAY_MS =
+ Options.key("retry_max_delay_ms")
+ .longType()
+ .defaultValue(5000L)
+ .withDescription("Maximum delay in milliseconds for
exponential backoff");
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
index b9a27a7c29..9da22a3ace 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
@@ -18,16 +18,17 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import java.io.IOException;
import java.util.Optional;
-public class AmazonDynamoDBSink extends AbstractSimpleSink<SeaTunnelRow, Void>
{
+public class AmazonDynamoDBSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+ implements SupportMultiTableSink {
private CatalogTable catalogTable;
@@ -50,8 +51,7 @@ public class AmazonDynamoDBSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
}
@Override
- public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
- throws IOException {
- return new AmazonDynamoDBWriter(amazondynamodbConfig,
catalogTable.getSeaTunnelRowType());
+ public AmazonDynamoDBWriter createWriter(SinkWriter.Context context)
throws IOException {
+ return new AmazonDynamoDBWriter(amazondynamodbConfig, catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
index 715da7c1b1..6dcafdc8d3 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -28,7 +29,10 @@ import com.google.auto.service.AutoService;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.ACCESS_KEY_ID;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.MAX_RETRIES;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.REGION;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.RETRY_BASE_DELAY_MS;
+import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.RETRY_MAX_DELAY_MS;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.SECRET_ACCESS_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.TABLE;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.URL;
@@ -44,7 +48,12 @@ public class AmazonDynamoDBSinkFactory implements
TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE)
- .optional(BATCH_SIZE)
+ .optional(
+ BATCH_SIZE,
+ SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA,
+ MAX_RETRIES,
+ RETRY_BASE_DELAY_MS,
+ RETRY_MAX_DELAY_MS)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
index f7e39b11a2..8badb2971b 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
@@ -27,20 +31,46 @@ import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import java.io.IOException;
import java.util.Optional;
-public class AmazonDynamoDBWriter extends AbstractSinkWriter<SeaTunnelRow,
Void> {
+public class AmazonDynamoDBWriter extends AbstractSinkWriter<SeaTunnelRow,
Void>
+ implements SupportMultiTableSinkWriter<Void> {
private final DynamoDbSinkClient dynamoDbSinkClient;
private final SeaTunnelRowSerializer serializer;
+ private final AmazonDynamoDBConfig amazondynamodbConfig;
+
+ public AmazonDynamoDBWriter(
+ AmazonDynamoDBConfig amazondynamodbConfig,
+ CatalogTable catalogTable,
+ DynamoDbSinkClient dynamoDbSinkClient) {
+ this.amazondynamodbConfig = amazondynamodbConfig;
+ this.dynamoDbSinkClient = dynamoDbSinkClient;
+
+ SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+ this.serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType,
amazondynamodbConfig);
+ }
public AmazonDynamoDBWriter(
- AmazonDynamoDBConfig amazondynamodbConfig, SeaTunnelRowType
seaTunnelRowType) {
+ AmazonDynamoDBConfig amazondynamodbConfig, CatalogTable
catalogTable) {
+
+ this.amazondynamodbConfig = amazondynamodbConfig;
+
+ SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+
dynamoDbSinkClient = new DynamoDbSinkClient(amazondynamodbConfig);
serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType,
amazondynamodbConfig);
}
@Override
public void write(SeaTunnelRow element) throws IOException {
- dynamoDbSinkClient.write(serializer.serialize(element));
+ // In multi-table pipelines, row.tableId identifies the target table.
+ // Falls back to the configured table name for single-table usage.
+ String tableName = element.getTableId();
+
+ if (StringUtils.isEmpty(tableName)) {
+ tableName = amazondynamodbConfig.getTable();
+ }
+
+ dynamoDbSinkClient.write(serializer.serialize(element), tableName);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
index 29ac8b7d4f..4f11b7dadb 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
@@ -19,11 +19,13 @@ package
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
+import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
@@ -34,15 +36,25 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+@Slf4j
public class DynamoDbSinkClient {
private final AmazonDynamoDBConfig amazondynamodbConfig;
private volatile boolean initialize;
private DynamoDbClient dynamoDbClient;
- private final List<WriteRequest> batchList;
+ private final Map<String, List<WriteRequest>> batchListByTable;
+ private final Object lock = new Object();
+
+ public DynamoDbSinkClient(
+ AmazonDynamoDBConfig amazondynamodbConfig, DynamoDbClient
dynamoDbClient) {
+ this.amazondynamodbConfig = amazondynamodbConfig;
+ this.dynamoDbClient = dynamoDbClient;
+ this.batchListByTable = new HashMap<>();
+ this.initialize = true;
+ }
public DynamoDbSinkClient(AmazonDynamoDBConfig amazondynamodbConfig) {
this.amazondynamodbConfig = amazondynamodbConfig;
- this.batchList = new ArrayList<>();
+ this.batchListByTable = new HashMap<>();
}
private void tryInit() {
@@ -64,34 +76,126 @@ public class DynamoDbSinkClient {
initialize = true;
}
- public synchronized void write(PutItemRequest putItemRequest) {
- tryInit();
- batchList.add(
- WriteRequest.builder()
-
.putRequest(PutRequest.builder().item(putItemRequest.item()).build())
- .build());
- if (amazondynamodbConfig.getBatchSize() > 0
- && batchList.size() >= amazondynamodbConfig.getBatchSize()) {
- flush();
+ public void write(PutItemRequest putItemRequest, String tableName) {
+ List<WriteRequest> toFlush = null;
+
+ synchronized (lock) {
+ tryInit();
+
+ batchListByTable.computeIfAbsent(tableName, k -> new
ArrayList<>());
+ batchListByTable
+ .get(tableName)
+ .add(
+ WriteRequest.builder()
+ .putRequest(
+ PutRequest.builder()
+
.item(putItemRequest.item())
+ .build())
+ .build());
+
+ if (amazondynamodbConfig.getBatchSize() > 0
+ && batchListByTable.get(tableName).size()
+ >= amazondynamodbConfig.getBatchSize()) {
+ // Copy batch and remove from map inside lock (fast)
+ toFlush = new ArrayList<>(batchListByTable.get(tableName));
+ batchListByTable.remove(tableName);
+ }
+ }
+
+ // Execute network I/O outside lock (other threads can continue)
+ if (toFlush != null) {
+ flushTable(tableName, toFlush);
}
}
- public synchronized void close() {
- if (dynamoDbClient != null) {
- flush();
- dynamoDbClient.close();
+ public void close() {
+ flush();
+ synchronized (lock) {
+ if (dynamoDbClient != null) {
+ dynamoDbClient.close();
+ }
}
}
- synchronized void flush() {
- if (batchList.isEmpty()) {
- return;
+ void flush() {
+ Map<String, List<WriteRequest>> batchToFlush = new HashMap<>();
+
+ synchronized (lock) {
+ if (dynamoDbClient == null || batchListByTable.isEmpty()) {
+ return;
+ }
+ batchToFlush.putAll(batchListByTable);
+ batchListByTable.clear();
+ }
+
+ for (Map.Entry<String, List<WriteRequest>> entry :
batchToFlush.entrySet()) {
+ flushTable(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void flushTable(String tableName, List<WriteRequest> requests) {
+ if (!requests.isEmpty()) {
+ flushWithRetry(tableName, requests);
}
- Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
- requestItems.put(amazondynamodbConfig.getTable(), batchList);
- dynamoDbClient.batchWriteItem(
-
BatchWriteItemRequest.builder().requestItems(requestItems).build());
+ }
+
+ private void flushWithRetry(String tableName, List<WriteRequest> requests)
{
+ List<WriteRequest> pendingRequests = new ArrayList<>(requests);
+
+ int maxRetries = amazondynamodbConfig.getMaxRetries();
+ long baseDelayMs = amazondynamodbConfig.getRetryBaseDelayMs();
+ long maxDelayMs = amazondynamodbConfig.getRetryMaxDelayMs();
+
+ int retryCount = 0;
+
+ while (!pendingRequests.isEmpty() && retryCount <= maxRetries) {
+ Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
+ requestItems.put(tableName, pendingRequests);
+
+ BatchWriteItemResponse response =
+ dynamoDbClient.batchWriteItem(
+
BatchWriteItemRequest.builder().requestItems(requestItems).build());
+
+ Map<String, List<WriteRequest>> unprocessedKeys =
response.unprocessedItems();
+ pendingRequests = unprocessedKeys.getOrDefault(tableName, new
ArrayList<>());
+
+ if (!pendingRequests.isEmpty()) {
+ retryCount++;
- batchList.clear();
+ long delay = Math.min(baseDelayMs * (1L << retryCount),
maxDelayMs);
+
+ long jitter = (long) (delay * Math.random() * 0.5);
+ delay += jitter;
+
+ log.warn(
+ "Retrying batch write to table '{}': attempt {}/{}, "
+ + "{} unprocessed items remaining, retrying in
{} ms",
+ tableName,
+ retryCount,
+ maxRetries,
+ pendingRequests.size(),
+ delay);
+
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during retry", e);
+ }
+ }
+ }
+
+ if (!pendingRequests.isEmpty()) {
+ log.error(
+ "Failed to write {} items to table '{}' after {} retries",
+ pendingRequests.size(),
+ tableName,
+ maxRetries);
+
+ throw new RuntimeException(
+ String.format(
+ "Failed to write %d items to table %s after %d
retries",
+ pendingRequests.size(), tableName, maxRetries));
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/AmazonDynamoDBMultiTableSinkTest.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/AmazonDynamoDBMultiTableSinkTest.java
new file mode 100644
index 0000000000..055b415481
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/AmazonDynamoDBMultiTableSinkTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb;
+
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink.AmazonDynamoDBSink;
+import
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink.AmazonDynamoDBWriter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class AmazonDynamoDBMultiTableSinkTest {
+
+ @Test
+ public void testSinkImplementsMultiTableSinkInterface() {
+ Assertions.assertTrue(
+
SupportMultiTableSink.class.isAssignableFrom(AmazonDynamoDBSink.class),
+ "AmazonDynamoDBSink must implement SupportMultiTableSink");
+ }
+
+ @Test
+ public void testWriterImplementsMultiTableSinkWriterInterface() {
+ Assertions.assertTrue(
+
SupportMultiTableSinkWriter.class.isAssignableFrom(AmazonDynamoDBWriter.class),
+ "AmazonDynamoDBWriter must implement
SupportMultiTableSinkWriter");
+ }
+}