This is an automated email from the ASF dual-hosted git repository. wuchunfu pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new b60ef97c95 [Improve] filestore options (#8921) b60ef97c95 is described below commit b60ef97c95158d64628f83bfec479aa48ef2852a Author: Jarvis <jar...@apache.org> AuthorDate: Fri Mar 7 19:21:30 2025 +0800 [Improve] filestore options (#8921) --- .../seatunnel/api/ConnectorOptionCheckTest.java | 1 - .../firestore/config/FirestoreParameters.java | 12 +++--- ...estoreConfig.java => FirestoreSinkOptions.java} | 2 +- .../google/firestore/sink/FirestoreSink.java | 48 ++++------------------ .../firestore/sink/FirestoreSinkFactory.java | 17 ++++++-- .../GoogleFirestoreIT.java | 8 ++-- 6 files changed, 34 insertions(+), 54 deletions(-) diff --git a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java index 90186e9a81..72b5b09c51 100644 --- a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java +++ b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java @@ -205,7 +205,6 @@ public class ConnectorOptionCheckTest { whiteList.add("SocketSinkOptions"); whiteList.add("SelectDBSinkOptions"); whiteList.add("PrometheusSinkOptions"); - whiteList.add("FirestoreSinkOptions"); whiteList.add("MilvusSinkOptions"); whiteList.add("RocketMqSourceOptions"); whiteList.add("TablestoreSinkOptions"); diff --git a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreParameters.java b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreParameters.java index bcc67a6244..21941165c3 100644 --- a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreParameters.java +++ b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreParameters.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.google.firestore.config; -import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import lombok.Data; @@ -32,11 +32,11 @@ public class FirestoreParameters implements Serializable { private String collection; - public FirestoreParameters buildWithConfig(Config config) { - this.projectId = config.getString(FirestoreConfig.PROJECT_ID.key()); - this.collection = config.getString(FirestoreConfig.COLLECTION.key()); - if (config.hasPath(FirestoreConfig.CREDENTIALS.key())) { - this.credentials = config.getString(FirestoreConfig.CREDENTIALS.key()); + public FirestoreParameters buildWithConfig(ReadonlyConfig config) { + this.projectId = config.get(FirestoreSinkOptions.PROJECT_ID); + this.collection = config.get(FirestoreSinkOptions.COLLECTION); + if (config.getOptional(FirestoreSinkOptions.CREDENTIALS).isPresent()) { + this.credentials = config.get(FirestoreSinkOptions.CREDENTIALS); } return this; } diff --git a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreConfig.java b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreSinkOptions.java similarity index 97% rename from seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreConfig.java rename to seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreSinkOptions.java index eeb4556745..493456fc64 100644 --- a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreConfig.java +++ b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreSinkOptions.java @@ -20,7 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.google.firestore.config; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -public class FirestoreConfig { +public class FirestoreSinkOptions { public static final Option<String> PROJECT_ID = Options.key("project_id") diff --git a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java index 6149ba9358..cd264f6eef 100644 --- a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java +++ b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java @@ -17,70 +17,40 @@ package org.apache.seatunnel.connectors.seatunnel.google.firestore.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreParameters; -import org.apache.seatunnel.connectors.seatunnel.google.firestore.exception.FirestoreConnectorException; - -import com.google.auto.service.AutoService; import java.io.IOException; import java.util.Optional; -import static org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.COLLECTION; -import static org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.PROJECT_ID; - -@AutoService(SeaTunnelSink.class) public class FirestoreSink extends AbstractSimpleSink<SeaTunnelRow, Void> { - private SeaTunnelRowType rowType; + private final CatalogTable catalogTable; - private FirestoreParameters firestoreParameters; + private final FirestoreParameters firestoreParameters; - @Override - public String getPluginName() { - return "GoogleFirestore"; + public FirestoreSink(CatalogTable catalogTable, FirestoreParameters firestoreParameters) { + this.catalogTable = catalogTable; + this.firestoreParameters = firestoreParameters; } @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists(pluginConfig, PROJECT_ID.key(), COLLECTION.key()); - if (!result.isSuccess()) { - throw new FirestoreConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } - this.firestoreParameters = new FirestoreParameters().buildWithConfig(pluginConfig); - } - - @Override - public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.rowType = seaTunnelRowType; + public String getPluginName() { + return "GoogleFirestore"; } @Override public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException { - return new FirestoreSinkWriter(rowType, firestoreParameters); + return new FirestoreSinkWriter(catalogTable.getSeaTunnelRowType(), firestoreParameters); } @Override public Optional<CatalogTable> getWriteCatalogTable() { - return super.getWriteCatalogTable(); + return Optional.ofNullable(catalogTable); } } diff --git a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSinkFactory.java b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSinkFactory.java index 0ee2120475..76b3496800 100644 --- a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSinkFactory.java +++ b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSinkFactory.java @@ -18,14 +18,17 @@ package org.apache.seatunnel.connectors.seatunnel.google.firestore.sink; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreParameters; import com.google.auto.service.AutoService; -import static org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.COLLECTION; -import static org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.CREDENTIALS; -import static org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.PROJECT_ID; +import static org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreSinkOptions.COLLECTION; +import static org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreSinkOptions.CREDENTIALS; +import static org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreSinkOptions.PROJECT_ID; @AutoService(Factory.class) public class FirestoreSinkFactory implements TableSinkFactory { @@ -39,4 +42,12 @@ public class FirestoreSinkFactory implements TableSinkFactory { public OptionRule optionRule() { return OptionRule.builder().required(PROJECT_ID, COLLECTION).optional(CREDENTIALS).build(); } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + return () -> + new FirestoreSink( + context.getCatalogTable(), + new FirestoreParameters().buildWithConfig(context.getOptions())); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-google-firestore-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/GoogleFirestoreIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-google-firestore-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/GoogleFirestoreIT.java index fe72624d19..2dad5db469 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-google-firestore-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/GoogleFirestoreIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-google-firestore-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/GoogleFirestoreIT.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.e2e.connector.google.firestore; -import org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig; +import org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreSinkOptions; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.TestContainer; @@ -86,9 +86,9 @@ public class GoogleFirestoreIT extends TestSuiteBase implements TestResource { File file = ContainerUtil.getResourcesFile(FIRESTORE_CONF_FILE); Config config = ConfigFactory.parseFile(file); Config firestoreConfig = config.getConfig("sink").getConfig("GoogleFirestore"); - this.projectId = firestoreConfig.getString(FirestoreConfig.PROJECT_ID.key()); - this.collection = firestoreConfig.getString(FirestoreConfig.COLLECTION.key()); - this.credentials = firestoreConfig.getString(FirestoreConfig.CREDENTIALS.key()); + this.projectId = firestoreConfig.getString(FirestoreSinkOptions.PROJECT_ID.key()); + this.collection = firestoreConfig.getString(FirestoreSinkOptions.COLLECTION.key()); + this.credentials = firestoreConfig.getString(FirestoreSinkOptions.CREDENTIALS.key()); } @AfterAll