This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b60ef97c95 [Improve] filestore options (#8921)
b60ef97c95 is described below

commit b60ef97c95158d64628f83bfec479aa48ef2852a
Author: Jarvis <jar...@apache.org>
AuthorDate: Fri Mar 7 19:21:30 2025 +0800

    [Improve] filestore options (#8921)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  1 -
 .../firestore/config/FirestoreParameters.java      | 12 +++---
 ...estoreConfig.java => FirestoreSinkOptions.java} |  2 +-
 .../google/firestore/sink/FirestoreSink.java       | 48 ++++------------------
 .../firestore/sink/FirestoreSinkFactory.java       | 17 ++++++--
 .../GoogleFirestoreIT.java                         |  8 ++--
 6 files changed, 34 insertions(+), 54 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 90186e9a81..72b5b09c51 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -205,7 +205,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("SocketSinkOptions");
         whiteList.add("SelectDBSinkOptions");
         whiteList.add("PrometheusSinkOptions");
-        whiteList.add("FirestoreSinkOptions");
         whiteList.add("MilvusSinkOptions");
         whiteList.add("RocketMqSourceOptions");
         whiteList.add("TablestoreSinkOptions");
diff --git 
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreParameters.java
 
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreParameters.java
index bcc67a6244..21941165c3 100644
--- 
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreParameters.java
+++ 
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreParameters.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.google.firestore.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.Data;
 
@@ -32,11 +32,11 @@ public class FirestoreParameters implements Serializable {
 
     private String collection;
 
-    public FirestoreParameters buildWithConfig(Config config) {
-        this.projectId = config.getString(FirestoreConfig.PROJECT_ID.key());
-        this.collection = config.getString(FirestoreConfig.COLLECTION.key());
-        if (config.hasPath(FirestoreConfig.CREDENTIALS.key())) {
-            this.credentials = 
config.getString(FirestoreConfig.CREDENTIALS.key());
+    public FirestoreParameters buildWithConfig(ReadonlyConfig config) {
+        this.projectId = config.get(FirestoreSinkOptions.PROJECT_ID);
+        this.collection = config.get(FirestoreSinkOptions.COLLECTION);
+        if (config.getOptional(FirestoreSinkOptions.CREDENTIALS).isPresent()) {
+            this.credentials = config.get(FirestoreSinkOptions.CREDENTIALS);
         }
         return this;
     }
diff --git 
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreConfig.java
 
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreSinkOptions.java
similarity index 97%
rename from 
seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreConfig.java
rename to 
seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreSinkOptions.java
index eeb4556745..493456fc64 100644
--- 
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreConfig.java
+++ 
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreSinkOptions.java
@@ -20,7 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-public class FirestoreConfig {
+public class FirestoreSinkOptions {
 
     public static final Option<String> PROJECT_ID =
             Options.key("project_id")
diff --git 
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
 
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
index 6149ba9358..cd264f6eef 100644
--- 
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
+++ 
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
@@ -17,70 +17,40 @@
 
 package org.apache.seatunnel.connectors.seatunnel.google.firestore.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreParameters;
-import 
org.apache.seatunnel.connectors.seatunnel.google.firestore.exception.FirestoreConnectorException;
-
-import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.COLLECTION;
-import static 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.PROJECT_ID;
-
-@AutoService(SeaTunnelSink.class)
 public class FirestoreSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
-    private SeaTunnelRowType rowType;
+    private final CatalogTable catalogTable;
 
-    private FirestoreParameters firestoreParameters;
+    private final FirestoreParameters firestoreParameters;
 
-    @Override
-    public String getPluginName() {
-        return "GoogleFirestore";
+    public FirestoreSink(CatalogTable catalogTable, FirestoreParameters 
firestoreParameters) {
+        this.catalogTable = catalogTable;
+        this.firestoreParameters = firestoreParameters;
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(pluginConfig, PROJECT_ID.key(), 
COLLECTION.key());
-        if (!result.isSuccess()) {
-            throw new FirestoreConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        this.firestoreParameters = new 
FirestoreParameters().buildWithConfig(pluginConfig);
-    }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.rowType = seaTunnelRowType;
+    public String getPluginName() {
+        return "GoogleFirestore";
     }
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
-        return new FirestoreSinkWriter(rowType, firestoreParameters);
+        return new FirestoreSinkWriter(catalogTable.getSeaTunnelRowType(), 
firestoreParameters);
     }
 
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+        return Optional.ofNullable(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSinkFactory.java
 
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSinkFactory.java
index 0ee2120475..76b3496800 100644
--- 
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSinkFactory.java
@@ -18,14 +18,17 @@
 package org.apache.seatunnel.connectors.seatunnel.google.firestore.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreParameters;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.COLLECTION;
-import static 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.CREDENTIALS;
-import static 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.PROJECT_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreSinkOptions.COLLECTION;
+import static 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreSinkOptions.CREDENTIALS;
+import static 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreSinkOptions.PROJECT_ID;
 
 @AutoService(Factory.class)
 public class FirestoreSinkFactory implements TableSinkFactory {
@@ -39,4 +42,12 @@ public class FirestoreSinkFactory implements 
TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder().required(PROJECT_ID, 
COLLECTION).optional(CREDENTIALS).build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () ->
+                new FirestoreSink(
+                        context.getCatalogTable(),
+                        new 
FirestoreParameters().buildWithConfig(context.getOptions()));
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-google-firestore-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/GoogleFirestoreIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-google-firestore-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/GoogleFirestoreIT.java
index fe72624d19..2dad5db469 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-google-firestore-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/GoogleFirestoreIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-google-firestore-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/GoogleFirestoreIT.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.e2e.connector.google.firestore;
 
-import 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreSinkOptions;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
@@ -86,9 +86,9 @@ public class GoogleFirestoreIT extends TestSuiteBase 
implements TestResource {
         File file = ContainerUtil.getResourcesFile(FIRESTORE_CONF_FILE);
         Config config = ConfigFactory.parseFile(file);
         Config firestoreConfig = 
config.getConfig("sink").getConfig("GoogleFirestore");
-        this.projectId = 
firestoreConfig.getString(FirestoreConfig.PROJECT_ID.key());
-        this.collection = 
firestoreConfig.getString(FirestoreConfig.COLLECTION.key());
-        this.credentials = 
firestoreConfig.getString(FirestoreConfig.CREDENTIALS.key());
+        this.projectId = 
firestoreConfig.getString(FirestoreSinkOptions.PROJECT_ID.key());
+        this.collection = 
firestoreConfig.getString(FirestoreSinkOptions.COLLECTION.key());
+        this.credentials = 
firestoreConfig.getString(FirestoreSinkOptions.CREDENTIALS.key());
     }
 
     @AfterAll

Reply via email to