This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 454af515e1 [Improve][Connector-V2] MongodbSinkFactory implements 
TableSinkFactory (#9879)
454af515e1 is described below

commit 454af515e1e97b6ef782c0727ac7faaf1787a2ae
Author: 老王 <[email protected]>
AuthorDate: Sat Sep 20 21:44:10 2025 +0800

    [Improve][Connector-V2] MongodbSinkFactory implements TableSinkFactory 
(#9879)
---
 .../seatunnel/mongodb/sink/MongodbSink.java        | 80 ++++------------------
 .../seatunnel/mongodb/sink/MongodbSinkFactory.java | 40 +++++++++++
 2 files changed, 52 insertions(+), 68 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index 8c86b119f9..563af55c19 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -17,17 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.MongodbSinkAggregatedCommitter;
@@ -35,73 +31,21 @@ import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
 
-import com.google.auto.service.AutoService;
-
-import java.util.List;
 import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
-@AutoService(SeaTunnelSink.class)
 public class MongodbSink
         implements SeaTunnelSink<
                 SeaTunnelRow, DocumentBulk, MongodbCommitInfo, 
MongodbAggregatedCommitInfo> {
 
-    private MongodbWriterOptions options;
+    private final MongodbWriterOptions options;
 
-    private SeaTunnelRowType seaTunnelRowType;
+    private final CatalogTable catalogTable;
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        if (pluginConfig.hasPath(MongodbConfig.URI.key())
-                && pluginConfig.hasPath(MongodbConfig.DATABASE.key())
-                && pluginConfig.hasPath(MongodbConfig.COLLECTION.key())) {
-            String connection = 
pluginConfig.getString(MongodbConfig.URI.key());
-            String database = 
pluginConfig.getString(MongodbConfig.DATABASE.key());
-            String collection = 
pluginConfig.getString(MongodbConfig.COLLECTION.key());
-            MongodbWriterOptions.Builder builder =
-                    MongodbWriterOptions.builder()
-                            .withConnectString(connection)
-                            .withDatabase(database)
-                            .withCollection(collection);
-            if 
(pluginConfig.hasPath(MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key())) {
-                builder.withFlushSize(
-                        
pluginConfig.getInt(MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key()));
-            }
-            if 
(pluginConfig.hasPath(MongodbConfig.BUFFER_FLUSH_INTERVAL.key())) {
-                builder.withBatchIntervalMs(
-                        
pluginConfig.getLong(MongodbConfig.BUFFER_FLUSH_INTERVAL.key()));
-            }
-            if (pluginConfig.hasPath(MongodbConfig.PRIMARY_KEY.key())) {
-                builder.withPrimaryKey(
-                        pluginConfig
-                                .getStringList(MongodbConfig.PRIMARY_KEY.key())
-                                .toArray(new String[0]));
-            }
-            List<String> fallbackKeys = 
MongodbConfig.PRIMARY_KEY.getFallbackKeys();
-            fallbackKeys.forEach(
-                    key -> {
-                        if (pluginConfig.hasPath(key)) {
-                            builder.withPrimaryKey(
-                                    
pluginConfig.getStringList(key).toArray(new String[0]));
-                        }
-                    });
-            if (pluginConfig.hasPath(MongodbConfig.UPSERT_ENABLE.key())) {
-                builder.withUpsertEnable(
-                        
pluginConfig.getBoolean(MongodbConfig.UPSERT_ENABLE.key()));
-            }
-            if (pluginConfig.hasPath(MongodbConfig.RETRY_MAX.key())) {
-                
builder.withRetryMax(pluginConfig.getInt(MongodbConfig.RETRY_MAX.key()));
-            }
-            if (pluginConfig.hasPath(MongodbConfig.RETRY_INTERVAL.key())) {
-                
builder.withRetryInterval(pluginConfig.getLong(MongodbConfig.RETRY_INTERVAL.key()));
-            }
-
-            if (pluginConfig.hasPath(MongodbConfig.TRANSACTION.key())) {
-                
builder.withTransaction(pluginConfig.getBoolean(MongodbConfig.TRANSACTION.key()));
-            }
-            this.options = builder.build();
-        }
+    public MongodbSink(MongodbWriterOptions options, CatalogTable 
catalogTable) {
+        this.options = options;
+        this.catalogTable = catalogTable;
     }
 
     @Override
@@ -109,17 +53,12 @@ public class MongodbSink
         return CONNECTOR_IDENTITY;
     }
 
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> 
createWriter(
             SinkWriter.Context context) {
         return new MongodbWriter(
                 new RowDataDocumentSerializer(
-                        
RowDataToBsonConverters.createConverter(seaTunnelRowType),
+                        
RowDataToBsonConverters.createConverter(catalogTable.getSeaTunnelRowType()),
                         options,
                         new MongoKeyExtractor(options)),
                 options,
@@ -148,4 +87,9 @@ public class MongodbSink
     public Optional<Serializer<MongodbCommitInfo>> getCommitInfoSerializer() {
         return options.transaction ? Optional.of(new DefaultSerializer<>()) : 
Optional.empty();
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
index 530a12471d..d8e2cf03bc 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
@@ -17,9 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 
 import com.google.auto.service.AutoService;
@@ -46,4 +49,41 @@ public class MongodbSinkFactory implements TableSinkFactory {
                         MongodbConfig.PRIMARY_KEY)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        String connection = readonlyConfig.get(MongodbConfig.URI);
+        String database = readonlyConfig.get(MongodbConfig.DATABASE);
+        String collection = readonlyConfig.get(MongodbConfig.COLLECTION);
+        MongodbWriterOptions.Builder builder =
+                MongodbWriterOptions.builder()
+                        .withConnectString(connection)
+                        .withDatabase(database)
+                        .withCollection(collection);
+        if 
(readonlyConfig.getOptional(MongodbConfig.BUFFER_FLUSH_MAX_ROWS).isPresent()) {
+            
builder.withFlushSize(readonlyConfig.get(MongodbConfig.BUFFER_FLUSH_MAX_ROWS));
+        }
+        if 
(readonlyConfig.getOptional(MongodbConfig.BUFFER_FLUSH_INTERVAL).isPresent()) {
+            
builder.withBatchIntervalMs(readonlyConfig.get(MongodbConfig.BUFFER_FLUSH_INTERVAL));
+        }
+        if (readonlyConfig.getOptional(MongodbConfig.PRIMARY_KEY).isPresent()) 
{
+            builder.withPrimaryKey(
+                    readonlyConfig.get(MongodbConfig.PRIMARY_KEY).toArray(new 
String[0]));
+        }
+        if 
(readonlyConfig.getOptional(MongodbConfig.UPSERT_ENABLE).isPresent()) {
+            
builder.withUpsertEnable(readonlyConfig.get(MongodbConfig.UPSERT_ENABLE));
+        }
+        if (readonlyConfig.getOptional(MongodbConfig.RETRY_MAX).isPresent()) {
+            builder.withRetryMax(readonlyConfig.get(MongodbConfig.RETRY_MAX));
+        }
+        if 
(readonlyConfig.getOptional(MongodbConfig.RETRY_INTERVAL).isPresent()) {
+            
builder.withRetryInterval(readonlyConfig.get(MongodbConfig.RETRY_INTERVAL));
+        }
+
+        if (readonlyConfig.getOptional(MongodbConfig.TRANSACTION).isPresent()) 
{
+            
builder.withTransaction(readonlyConfig.get(MongodbConfig.TRANSACTION));
+        }
+        return () -> new MongodbSink(builder.build(), 
context.getCatalogTable());
+    }
 }

Reply via email to