This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 19b9d5bcce [Improve][Connectors-v2] Refactor Slack sink using Factory 
to create instance (#10514)
19b9d5bcce is described below

commit 19b9d5bcce404dd2a42fe7ff8816f5a10b7cf602
Author: Jarvis <[email protected]>
AuthorDate: Wed Mar 25 17:06:11 2026 +0800

    [Improve][Connectors-v2] Refactor Slack sink using Factory to create 
instance (#10514)
---
 .../seatunnel/slack/client/SlackClient.java        | 13 +++---
 .../connectors/seatunnel/slack/sink/SlackSink.java | 47 ++++------------------
 .../seatunnel/slack/sink/SlackSinkFactory.java     |  7 ++++
 .../seatunnel/slack/sink/SlackWriter.java          |  5 +--
 4 files changed, 23 insertions(+), 49 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
index 565d03bb69..8b037bbb5c 100644
--- 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.slack.client;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException;
@@ -39,10 +38,10 @@ import static 
org.apache.seatunnel.connectors.seatunnel.slack.config.SlackSinkOp
 
 @Slf4j
 public class SlackClient {
-    private final Config pluginConfig;
+    private final ReadonlyConfig pluginConfig;
     private final MethodsClient methodsClient;
 
-    public SlackClient(Config pluginConfig) {
+    public SlackClient(ReadonlyConfig pluginConfig) {
         this.pluginConfig = pluginConfig;
         this.methodsClient = Slack.getInstance().methods();
     }
@@ -58,10 +57,10 @@ public class SlackClient {
                             r ->
                                     r
                                             // The Token used to initialize app
-                                            
.token(pluginConfig.getString(OAUTH_TOKEN.key())));
+                                            
.token(pluginConfig.get(OAUTH_TOKEN)));
             channels = conversationsListResponse.getChannels();
             for (Conversation channel : channels) {
-                if 
(channel.getName().equals(pluginConfig.getString(SLACK_CHANNEL.key()))) {
+                if (channel.getName().equals(pluginConfig.get(SLACK_CHANNEL))) 
{
                     conversionId = channel.getId();
                     // Break from for loop
                     break;
@@ -84,7 +83,7 @@ public class SlackClient {
                             r ->
                                     r
                                             // The Token used to initialize app
-                                            
.token(pluginConfig.getString(SLACK_CHANNEL.key()))
+                                            
.token(pluginConfig.get(SLACK_CHANNEL))
                                             .channel(channelId)
                                             .text(text));
             publishMessageSuccess = chatPostMessageResponse.isOk();
diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
index 214a08c403..eca6806d86 100644
--- 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
@@ -17,44 +17,31 @@
 
 package org.apache.seatunnel.connectors.seatunnel.slack.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.slack.config.SlackSinkOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException;
-
-import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 import java.util.Optional;
 
 /** Slack sink class */
-@AutoService(SeaTunnelSink.class)
 public class SlackSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
-    private Config pluginConfig;
-    private SeaTunnelRowType seaTunnelRowType;
+    private ReadonlyConfig pluginConfig;
+    private CatalogTable catalogTable;
 
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public SlackSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
+        this.pluginConfig = pluginConfig;
+        this.catalogTable = catalogTable;
     }
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
-        return new SlackWriter(seaTunnelRowType, pluginConfig);
+        return new SlackWriter(catalogTable.getSeaTunnelRowType(), 
pluginConfig);
     }
 
     @Override
@@ -62,26 +49,8 @@ public class SlackSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         return "SlackSink";
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        SlackSinkOptions.WEBHOOKS_URL.key(),
-                        SlackSinkOptions.OAUTH_TOKEN.key(),
-                        SlackSinkOptions.SLACK_CHANNEL.key());
-        if (!checkResult.isSuccess()) {
-            throw new SlackConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
checkResult.getMsg()));
-        }
-        this.pluginConfig = pluginConfig;
-    }
-
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+        return Optional.of(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
index a9dd6accb6..4fbe80562e 100644
--- 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
@@ -18,8 +18,10 @@
 package org.apache.seatunnel.connectors.seatunnel.slack.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.slack.config.SlackSinkOptions;
 
 import com.google.auto.service.AutoService;
@@ -40,4 +42,9 @@ public class SlackSinkFactory implements TableSinkFactory {
                         SlackSinkOptions.SLACK_CHANNEL)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new SlackSink(context.getOptions(), 
context.getCatalogTable());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
index 2fa7faec42..7c9cf69b62 100644
--- 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.slack.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
@@ -39,7 +38,7 @@ public class SlackWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
     private final SeaTunnelRowType seaTunnelRowType;
     private static final long POST_MSG_WAITING_TIME = 1500L;
 
-    public SlackWriter(SeaTunnelRowType seaTunnelRowType, Config pluginConfig) 
{
+    public SlackWriter(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig 
pluginConfig) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.slackClient = new SlackClient(pluginConfig);
         this.conversationId = slackClient.findConversation();

Reply via email to