This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 454af515e1 [Improve][Connector-V2] MongodbSinkFactory implements
TableSinkFactory (#9879)
454af515e1 is described below
commit 454af515e1e97b6ef782c0727ac7faaf1787a2ae
Author: 老王 <[email protected]>
AuthorDate: Sat Sep 20 21:44:10 2025 +0800
[Improve][Connector-V2] MongodbSinkFactory implements TableSinkFactory
(#9879)
---
.../seatunnel/mongodb/sink/MongodbSink.java | 80 ++++------------------
.../seatunnel/mongodb/sink/MongodbSinkFactory.java | 40 +++++++++++
2 files changed, 52 insertions(+), 68 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index 8c86b119f9..563af55c19 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -17,17 +17,13 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.MongodbSinkAggregatedCommitter;
@@ -35,73 +31,21 @@ import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk
import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
-import com.google.auto.service.AutoService;
-
-import java.util.List;
import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
-@AutoService(SeaTunnelSink.class)
public class MongodbSink
implements SeaTunnelSink<
SeaTunnelRow, DocumentBulk, MongodbCommitInfo,
MongodbAggregatedCommitInfo> {
- private MongodbWriterOptions options;
+ private final MongodbWriterOptions options;
- private SeaTunnelRowType seaTunnelRowType;
+ private final CatalogTable catalogTable;
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- if (pluginConfig.hasPath(MongodbConfig.URI.key())
- && pluginConfig.hasPath(MongodbConfig.DATABASE.key())
- && pluginConfig.hasPath(MongodbConfig.COLLECTION.key())) {
- String connection =
pluginConfig.getString(MongodbConfig.URI.key());
- String database =
pluginConfig.getString(MongodbConfig.DATABASE.key());
- String collection =
pluginConfig.getString(MongodbConfig.COLLECTION.key());
- MongodbWriterOptions.Builder builder =
- MongodbWriterOptions.builder()
- .withConnectString(connection)
- .withDatabase(database)
- .withCollection(collection);
- if
(pluginConfig.hasPath(MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key())) {
- builder.withFlushSize(
-
pluginConfig.getInt(MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key()));
- }
- if
(pluginConfig.hasPath(MongodbConfig.BUFFER_FLUSH_INTERVAL.key())) {
- builder.withBatchIntervalMs(
-
pluginConfig.getLong(MongodbConfig.BUFFER_FLUSH_INTERVAL.key()));
- }
- if (pluginConfig.hasPath(MongodbConfig.PRIMARY_KEY.key())) {
- builder.withPrimaryKey(
- pluginConfig
- .getStringList(MongodbConfig.PRIMARY_KEY.key())
- .toArray(new String[0]));
- }
- List<String> fallbackKeys =
MongodbConfig.PRIMARY_KEY.getFallbackKeys();
- fallbackKeys.forEach(
- key -> {
- if (pluginConfig.hasPath(key)) {
- builder.withPrimaryKey(
-
pluginConfig.getStringList(key).toArray(new String[0]));
- }
- });
- if (pluginConfig.hasPath(MongodbConfig.UPSERT_ENABLE.key())) {
- builder.withUpsertEnable(
-
pluginConfig.getBoolean(MongodbConfig.UPSERT_ENABLE.key()));
- }
- if (pluginConfig.hasPath(MongodbConfig.RETRY_MAX.key())) {
-
builder.withRetryMax(pluginConfig.getInt(MongodbConfig.RETRY_MAX.key()));
- }
- if (pluginConfig.hasPath(MongodbConfig.RETRY_INTERVAL.key())) {
-
builder.withRetryInterval(pluginConfig.getLong(MongodbConfig.RETRY_INTERVAL.key()));
- }
-
- if (pluginConfig.hasPath(MongodbConfig.TRANSACTION.key())) {
-
builder.withTransaction(pluginConfig.getBoolean(MongodbConfig.TRANSACTION.key()));
- }
- this.options = builder.build();
- }
+ public MongodbSink(MongodbWriterOptions options, CatalogTable
catalogTable) {
+ this.options = options;
+ this.catalogTable = catalogTable;
}
@Override
@@ -109,17 +53,12 @@ public class MongodbSink
return CONNECTOR_IDENTITY;
}
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
@Override
public SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk>
createWriter(
SinkWriter.Context context) {
return new MongodbWriter(
new RowDataDocumentSerializer(
-
RowDataToBsonConverters.createConverter(seaTunnelRowType),
+
RowDataToBsonConverters.createConverter(catalogTable.getSeaTunnelRowType()),
options,
new MongoKeyExtractor(options)),
options,
@@ -148,4 +87,9 @@ public class MongodbSink
public Optional<Serializer<MongodbCommitInfo>> getCommitInfoSerializer() {
return options.transaction ? Optional.of(new DefaultSerializer<>()) :
Optional.empty();
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
index 530a12471d..d8e2cf03bc 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
@@ -17,9 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import com.google.auto.service.AutoService;
@@ -46,4 +49,41 @@ public class MongodbSinkFactory implements TableSinkFactory {
MongodbConfig.PRIMARY_KEY)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ ReadonlyConfig readonlyConfig = context.getOptions();
+ String connection = readonlyConfig.get(MongodbConfig.URI);
+ String database = readonlyConfig.get(MongodbConfig.DATABASE);
+ String collection = readonlyConfig.get(MongodbConfig.COLLECTION);
+ MongodbWriterOptions.Builder builder =
+ MongodbWriterOptions.builder()
+ .withConnectString(connection)
+ .withDatabase(database)
+ .withCollection(collection);
+ if
(readonlyConfig.getOptional(MongodbConfig.BUFFER_FLUSH_MAX_ROWS).isPresent()) {
+
builder.withFlushSize(readonlyConfig.get(MongodbConfig.BUFFER_FLUSH_MAX_ROWS));
+ }
+ if
(readonlyConfig.getOptional(MongodbConfig.BUFFER_FLUSH_INTERVAL).isPresent()) {
+
builder.withBatchIntervalMs(readonlyConfig.get(MongodbConfig.BUFFER_FLUSH_INTERVAL));
+ }
+ if (readonlyConfig.getOptional(MongodbConfig.PRIMARY_KEY).isPresent())
{
+ builder.withPrimaryKey(
+ readonlyConfig.get(MongodbConfig.PRIMARY_KEY).toArray(new
String[0]));
+ }
+ if
(readonlyConfig.getOptional(MongodbConfig.UPSERT_ENABLE).isPresent()) {
+
builder.withUpsertEnable(readonlyConfig.get(MongodbConfig.UPSERT_ENABLE));
+ }
+ if (readonlyConfig.getOptional(MongodbConfig.RETRY_MAX).isPresent()) {
+ builder.withRetryMax(readonlyConfig.get(MongodbConfig.RETRY_MAX));
+ }
+ if
(readonlyConfig.getOptional(MongodbConfig.RETRY_INTERVAL).isPresent()) {
+
builder.withRetryInterval(readonlyConfig.get(MongodbConfig.RETRY_INTERVAL));
+ }
+
+ if (readonlyConfig.getOptional(MongodbConfig.TRANSACTION).isPresent())
{
+
builder.withTransaction(readonlyConfig.get(MongodbConfig.TRANSACTION));
+ }
+ return () -> new MongodbSink(builder.build(),
context.getCatalogTable());
+ }
}