This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new af83a302cf [improve] socket options (#9517)
af83a302cf is described below
commit af83a302cff22f8793a0b187ed6d933fc843b39d
Author: Jarvis <[email protected]>
AuthorDate: Sat Jun 28 22:56:00 2025 +0800
[improve] socket options (#9517)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
...ConfigOptions.java => SocketCommonOptions.java} | 11 ++---
.../config/{SinkConfig.java => SocketConfig.java} | 18 +++----
...nkConfigOptions.java => SocketSinkOptions.java} | 9 +---
...ConfigOptions.java => SocketSourceOptions.java} | 19 +-------
.../seatunnel/socket/sink/SocketClient.java | 4 +-
.../seatunnel/socket/sink/SocketSink.java | 52 +++++---------------
.../seatunnel/socket/sink/SocketSinkFactory.java | 19 +++++---
.../seatunnel/socket/sink/SocketSinkWriter.java | 13 ++---
.../seatunnel/socket/source/SocketSource.java | 53 +++++++++------------
.../socket/source/SocketSourceFactory.java | 19 ++++++--
.../socket/source/SocketSourceParameter.java | 55 ----------------------
.../socket/source/SocketSourceReader.java | 5 +-
13 files changed, 83 insertions(+), 196 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 7c6d791f27..47cbd7decd 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -195,11 +195,9 @@ public class ConnectorOptionCheckTest {
whiteList.add("PulsarSinkOptions");
whiteList.add("PulsarSourceOptions");
whiteList.add("MongodbSinkOptions");
- whiteList.add("SocketSinkOptions");
whiteList.add("SelectDBSinkOptions");
whiteList.add("TablestoreSinkOptions");
whiteList.add("TableStoreDBSourceOptions");
- whiteList.add("SocketSourceOptions");
whiteList.add("PostgresIncrementalSourceOptions");
whiteList.add("SqlServerIncrementalSourceOptions");
whiteList.add("OracleIncrementalSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketCommonOptions.java
similarity index 77%
copy from
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
copy to
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketCommonOptions.java
index 2de6578fd2..12e180ddc3 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketCommonOptions.java
@@ -20,18 +20,13 @@ package
org.apache.seatunnel.connectors.seatunnel.socket.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-public class SocketSinkConfigOptions {
- private static final int DEFAULT_MAX_RETRIES = 3;
+public class SocketCommonOptions {
+
+ public static final String identifier = "Socket";
public static final Option<String> HOST =
Options.key("host").stringType().noDefaultValue().withDescription("socket
host");
public static final Option<Integer> PORT =
Options.key("port").intType().noDefaultValue().withDescription("socket port");
-
- public static final Option<Integer> MAX_RETRIES =
- Options.key("max_retries")
- .intType()
- .defaultValue(DEFAULT_MAX_RETRIES)
- .withDescription("default value is " + DEFAULT_MAX_RETRIES
+ ", max retries");
}
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketConfig.java
similarity index 59%
rename from
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
rename to
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketConfig.java
index d41d06fba6..7a1ff52eb2 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketConfig.java
@@ -17,27 +17,21 @@
package org.apache.seatunnel.connectors.seatunnel.socket.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.Data;
import java.io.Serializable;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.MAX_RETRIES;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
-
@Data
-public class SinkConfig implements Serializable {
+public class SocketConfig implements Serializable {
private String host;
private int port;
private int maxNumRetries;
- public SinkConfig(Config config) {
- this.host = config.getString(HOST.key());
- this.port = config.getInt(PORT.key());
- if (config.hasPath(MAX_RETRIES.key())) {
- this.maxNumRetries = config.getInt(MAX_RETRIES.key());
- }
+ public SocketConfig(ReadonlyConfig config) {
+ this.host = config.get(SocketCommonOptions.HOST);
+ this.port = config.get(SocketCommonOptions.PORT);
+ this.maxNumRetries = config.get(SocketSinkOptions.MAX_RETRIES);
}
}
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkOptions.java
similarity index 80%
copy from
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
copy to
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkOptions.java
index 2de6578fd2..5364afb436 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkOptions.java
@@ -20,14 +20,9 @@ package
org.apache.seatunnel.connectors.seatunnel.socket.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-public class SocketSinkConfigOptions {
- private static final int DEFAULT_MAX_RETRIES = 3;
-
- public static final Option<String> HOST =
-
Options.key("host").stringType().noDefaultValue().withDescription("socket
host");
+public class SocketSinkOptions extends SocketCommonOptions {
- public static final Option<Integer> PORT =
-
Options.key("port").intType().noDefaultValue().withDescription("socket port");
+ private static final int DEFAULT_MAX_RETRIES = 3;
public static final Option<Integer> MAX_RETRIES =
Options.key("max_retries")
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSourceOptions.java
similarity index 53%
rename from
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
rename to
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSourceOptions.java
index 2de6578fd2..71823fea29 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSourceOptions.java
@@ -17,21 +17,4 @@
package org.apache.seatunnel.connectors.seatunnel.socket.config;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public class SocketSinkConfigOptions {
- private static final int DEFAULT_MAX_RETRIES = 3;
-
- public static final Option<String> HOST =
-
Options.key("host").stringType().noDefaultValue().withDescription("socket
host");
-
- public static final Option<Integer> PORT =
-
Options.key("port").intType().noDefaultValue().withDescription("socket port");
-
- public static final Option<Integer> MAX_RETRIES =
- Options.key("max_retries")
- .intType()
- .defaultValue(DEFAULT_MAX_RETRIES)
- .withDescription("default value is " + DEFAULT_MAX_RETRIES
+ ", max retries");
-}
+public class SocketSourceOptions extends SocketCommonOptions {}
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
index ef44e291c9..4d5b4029f3 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.socket.sink;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketConfig;
import
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorException;
@@ -42,7 +42,7 @@ public class SocketClient {
private volatile boolean isRunning = Boolean.TRUE;
private static final int CONNECTION_RETRY_DELAY = 500;
- public SocketClient(SinkConfig config, SerializationSchema
serializationSchema) {
+ public SocketClient(SocketConfig config, SerializationSchema
serializationSchema) {
this.hostName = config.getHost();
this.port = config.getPort();
this.serializationSchema = serializationSchema;
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
index 87bff65a9a..88a05072d1 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
@@ -17,69 +17,41 @@
package org.apache.seatunnel.connectors.seatunnel.socket.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
-import
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorException;
-
-import com.google.auto.service.AutoService;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketConfig;
+import
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkOptions;
import java.io.IOException;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
-
-@AutoService(SeaTunnelSink.class)
public class SocketSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
- private Config pluginConfig;
- private SinkConfig sinkConfig;
- private SeaTunnelRowType seaTunnelRowType;
- @Override
- public String getPluginName() {
- return "Socket";
- }
+ private final SocketConfig socketConfig;
+ private final CatalogTable catalogTable;
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.pluginConfig = pluginConfig;
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
PORT.key(), HOST.key());
- if (!result.isSuccess()) {
- throw new SocketConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- sinkConfig = new SinkConfig(pluginConfig);
+ public SocketSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
+ this.socketConfig = new SocketConfig(pluginConfig);
+ this.catalogTable = catalogTable;
}
@Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public String getPluginName() {
+ return SocketSinkOptions.identifier;
}
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
- return new SocketSinkWriter(sinkConfig, seaTunnelRowType);
+ return new SocketSinkWriter(socketConfig,
catalogTable.getSeaTunnelRowType());
}
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ return Optional.of(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkFactory.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkFactory.java
index 9f86c5e78e..6ea535923d 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkFactory.java
@@ -18,24 +18,31 @@
package org.apache.seatunnel.connectors.seatunnel.socket.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.MAX_RETRIES;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
-
@AutoService(Factory.class)
public class SocketSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return "Socket";
+ return SocketSinkOptions.identifier;
}
@Override
public OptionRule optionRule() {
- return OptionRule.builder().required(HOST,
PORT).optional(MAX_RETRIES).build();
+ return OptionRule.builder()
+ .required(SocketSinkOptions.HOST, SocketSinkOptions.PORT)
+ .optional(SocketSinkOptions.MAX_RETRIES)
+ .build();
+ }
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new SocketSink(context.getOptions(),
context.getCatalogTable());
}
}
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
index c901abfc15..3e3cb13587 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
@@ -17,24 +17,21 @@
package org.apache.seatunnel.connectors.seatunnel.socket.sink;
-import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketConfig;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import java.io.IOException;
public class SocketSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private final SocketClient socketClient;
- private final SerializationSchema serializationSchema;
- private final SinkConfig sinkConfig;
- SocketSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType)
throws IOException {
- this.sinkConfig = sinkConfig;
- this.serializationSchema = new
JsonSerializationSchema(seaTunnelRowType);
- this.socketClient = new SocketClient(sinkConfig, serializationSchema);
+ SocketSinkWriter(SocketConfig socketConfig, SeaTunnelRowType
seaTunnelRowType)
+ throws IOException {
+ this.socketClient =
+ new SocketClient(socketConfig, new
JsonSerializationSchema(seaTunnelRowType));
socketClient.open();
}
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
index a115924cfb..99d66b0034 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
@@ -17,36 +17,39 @@
package org.apache.seatunnel.connectors.seatunnel.socket.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorException;
-
-import com.google.auto.service.AutoService;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketConfig;
+import
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSourceOptions;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
+import java.util.Collections;
+import java.util.List;
-@AutoService(SeaTunnelSource.class)
public class SocketSource extends AbstractSingleSplitSource<SeaTunnelRow> {
- private SocketSourceParameter parameter;
+ private final SocketConfig parameter;
+ private final CatalogTable catalogTable;
private JobContext jobContext;
+ public SocketSource(ReadonlyConfig pluginConfig) {
+ this.parameter = new SocketConfig(pluginConfig);
+ SeaTunnelRowType seaTunnelRowType =
+ new SeaTunnelRowType(
+ new String[] {"value"}, new SeaTunnelDataType<?>[]
{BasicType.STRING_TYPE});
+ this.catalogTable =
+
CatalogTableUtil.getCatalogTable(SocketSourceOptions.identifier,
seaTunnelRowType);
+ }
+
@Override
public Boundedness getBoundedness() {
return JobMode.BATCH.equals(jobContext.getJobMode())
@@ -56,20 +59,7 @@ public class SocketSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
@Override
public String getPluginName() {
- return "Socket";
- }
-
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
PORT.key(), HOST.key());
- if (!result.isSuccess()) {
- throw new SocketConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- this.parameter = new SocketSourceParameter(pluginConfig);
+ return SocketSourceOptions.identifier;
}
@Override
@@ -78,9 +68,8 @@ public class SocketSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
}
@Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return new SeaTunnelRowType(
- new String[] {"value"}, new SeaTunnelDataType<?>[]
{BasicType.STRING_TYPE});
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceFactory.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceFactory.java
index 11ecefa718..4bc6d38210 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceFactory.java
@@ -19,24 +19,35 @@ package
org.apache.seatunnel.connectors.seatunnel.socket.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSourceOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
+import java.io.Serializable;
@AutoService(Factory.class)
public class SocketSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
- return "Socket";
+ return SocketSourceOptions.identifier;
}
@Override
public OptionRule optionRule() {
- return OptionRule.builder().required(HOST, PORT).build();
+ return OptionRule.builder()
+ .required(SocketSourceOptions.HOST, SocketSourceOptions.PORT)
+ .build();
+ }
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
SocketSource(context.getOptions());
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
deleted file mode 100644
index b7ce011ae0..0000000000
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.socket.source;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
-
-public class SocketSourceParameter implements Serializable {
- private final String host;
- private final Integer port;
-
- public String getHost() {
- return StringUtils.isBlank(host) ? HOST.defaultValue() : host;
- }
-
- public Integer getPort() {
- return Objects.isNull(port) ? PORT.defaultValue() : port;
- }
-
- public SocketSourceParameter(Config config) {
- if (config.hasPath(HOST.key())) {
- this.host = config.getString(HOST.key());
- } else {
- this.host = HOST.defaultValue();
- }
-
- if (config.hasPath(PORT.key())) {
- this.port = config.getInt(PORT.key());
- } else {
- this.port = PORT.defaultValue();
- }
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
index 903f948d8e..70b4326f1c 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketConfig;
import lombok.extern.slf4j.Slf4j;
@@ -34,12 +35,12 @@ import java.net.Socket;
@Slf4j
public class SocketSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
private static final int CHAR_BUFFER_SIZE = 8192;
- private final SocketSourceParameter parameter;
+ private final SocketConfig parameter;
private final SingleSplitReaderContext context;
private Socket socket;
private final String delimiter = "\n";
- SocketSourceReader(SocketSourceParameter parameter,
SingleSplitReaderContext context) {
+ SocketSourceReader(SocketConfig parameter, SingleSplitReaderContext
context) {
this.parameter = parameter;
this.context = context;
}