This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a65d081 [Improvement][Config] Print config in origin order (#1484)
a65d081 is described below
commit a65d081c915e4e2aee75003b742de55278c7ce5b
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Mar 15 22:58:06 2022 +0800
[Improvement][Config] Print config in origin order (#1484)
* [Improvement][Config] Print config in origin order
* Optimize generic type
* Add license announce
---
LICENSE | 3 +-
.../java/org/apache/seatunnel/env/Execution.java | 5 +-
.../apache/seatunnel/flink/BaseFlinkSource.java | 4 +-
.../seatunnel/flink/batch/FlinkBatchExecution.java | 48 +-
.../seatunnel/flink/batch/FlinkBatchSink.java | 5 +-
.../seatunnel/flink/batch/FlinkBatchSource.java | 5 +-
.../seatunnel/flink/batch/FlinkBatchTransform.java | 5 +-
.../flink/stream/FlinkStreamExecution.java | 32 +-
.../seatunnel/flink/stream/FlinkStreamSink.java | 5 +-
.../seatunnel/flink/stream/FlinkStreamSource.java | 6 +-
.../flink/stream/FlinkStreamTransform.java | 5 +-
seatunnel-config/seatunnel-config-base/pom.xml | 1 +
.../typesafe/config/impl/SimpleConfigObject.java | 568 +++++++++++++++++++++
.../apache/seatunnel/flink/sink/ConsoleSink.java | 2 +-
.../org/apache/seatunnel/flink/sink/DorisSink.java | 2 +-
.../org/apache/seatunnel/flink/sink/DruidSink.java | 2 +-
.../apache/seatunnel/flink/source/DruidSource.java | 2 +-
.../apache/seatunnel/flink/sink/Elasticsearch.java | 2 +-
.../seatunnel/flink/source/FakeSourceStream.java | 2 +-
.../org/apache/seatunnel/flink/sink/FileSink.java | 2 +-
.../apache/seatunnel/flink/source/FileSource.java | 2 +-
.../apache/seatunnel/flink/sink/InfluxDbSink.java | 2 +-
.../seatunnel/flink/source/InfluxDbSource.java | 2 +-
.../org/apache/seatunnel/flink/sink/JdbcSink.java | 2 +-
.../apache/seatunnel/flink/source/JdbcSource.java | 2 +-
.../apache/seatunnel/flink/sink/KafkaTable.java | 2 +-
.../seatunnel/flink/source/KafkaTableStream.java | 2 +-
.../seatunnel/flink/source/SocketStream.java | 2 +-
.../command/flink/FlinkTaskExecuteCommand.java | 7 +-
.../org/apache/seatunnel/config/ConfigBuilder.java | 15 +-
.../flink/transform/DataStreamToTable.java | 2 +-
.../apache/seatunnel/flink/transform/Split.java | 2 +-
.../org/apache/seatunnel/flink/transform/Sql.java | 2 +-
.../flink/transform/TableToDataStream.java | 2 +-
34 files changed, 664 insertions(+), 88 deletions(-)
diff --git a/LICENSE b/LICENSE
index c83457a..5334cec 100644
--- a/LICENSE
+++ b/LICENSE
@@ -213,4 +213,5 @@
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java
from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java
from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java
from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java
from https://github.com/lightbend/config
\ No newline at end of file
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java
from https://github.com/lightbend/config
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java
from https://github.com/lightbend/config
\ No newline at end of file
diff --git
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
index f7aec8f..b6f4167 100644
---
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
+++
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
@@ -27,7 +27,10 @@ import java.util.List;
/**
* the SeaTunnel job's execution context
*/
-public interface Execution<SR extends BaseSource, TF extends BaseTransform, SK
extends BaseSink> extends Plugin<Void> {
+public interface Execution<
+ SR extends BaseSource<? extends RuntimeEnv>,
+ TF extends BaseTransform<? extends RuntimeEnv>,
+ SK extends BaseSink<? extends RuntimeEnv>> extends Plugin<Void> {
/**
* start to execute the SeaTunnel job
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java
index 1345eba..d013496 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java
@@ -22,5 +22,7 @@ import org.apache.seatunnel.apis.BaseSource;
/**
* a base interface indicates a source plugin running on Flink.
*/
-public interface BaseFlinkSource extends BaseSource<FlinkEnvironment> {
+public interface BaseFlinkSource<OUT> extends BaseSource<FlinkEnvironment> {
+
+ OUT getData(FlinkEnvironment env);
}
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index 3e27f9b..017f5bd 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.env.Execution;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.util.TableUtil;
-import org.apache.seatunnel.plugin.Plugin;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -29,12 +28,13 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
+import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
+import java.util.Optional;
public class FlinkBatchExecution implements Execution<FlinkBatchSource,
FlinkBatchTransform, FlinkBatchSink> {
@@ -42,7 +42,7 @@ public class FlinkBatchExecution implements
Execution<FlinkBatchSource, FlinkBat
private Config config;
- private FlinkEnvironment flinkEnvironment;
+ private final FlinkEnvironment flinkEnvironment;
public FlinkBatchExecution(FlinkEnvironment flinkEnvironment) {
this.flinkEnvironment = flinkEnvironment;
@@ -50,31 +50,25 @@ public class FlinkBatchExecution implements
Execution<FlinkBatchSource, FlinkBat
@Override
public void start(List<FlinkBatchSource> sources,
List<FlinkBatchTransform> transforms, List<FlinkBatchSink> sinks) throws
Exception {
- List<DataSet> data = new ArrayList<>();
+ List<DataSet<Row>> data = new ArrayList<>();
for (FlinkBatchSource source : sources) {
- DataSet dataSet = source.getData(flinkEnvironment);
+ DataSet<Row> dataSet = source.getData(flinkEnvironment);
data.add(dataSet);
- registerResultTable(source, dataSet);
+ registerResultTable(source.getConfig(), dataSet);
}
- DataSet input = data.get(0);
+ DataSet<Row> input = data.get(0);
for (FlinkBatchTransform transform : transforms) {
- DataSet dataSet = fromSourceTable(transform);
- if (Objects.isNull(dataSet)) {
- dataSet = input;
- }
+ DataSet<Row> dataSet =
fromSourceTable(transform.getConfig()).orElse(input);
input = transform.processBatch(flinkEnvironment, dataSet);
- registerResultTable(transform, input);
+ registerResultTable(transform.getConfig(), input);
transform.registerFunction(flinkEnvironment);
}
for (FlinkBatchSink sink : sinks) {
- DataSet dataSet = fromSourceTable(sink);
- if (Objects.isNull(dataSet)) {
- dataSet = input;
- }
+ DataSet<Row> dataSet =
fromSourceTable(sink.getConfig()).orElse(input);
sink.outputBatch(flinkEnvironment, dataSet);
}
@@ -88,14 +82,13 @@ public class FlinkBatchExecution implements
Execution<FlinkBatchSource, FlinkBat
}
}
- private void registerResultTable(Plugin plugin, DataSet dataSet) {
- Config config = plugin.getConfig();
- if (config.hasPath(RESULT_TABLE_NAME)) {
- String name = config.getString(RESULT_TABLE_NAME);
+ private void registerResultTable(Config sourceConfig, DataSet<Row>
dataSet) {
+ if (sourceConfig.hasPath(RESULT_TABLE_NAME)) {
+ String name = sourceConfig.getString(RESULT_TABLE_NAME);
BatchTableEnvironment tableEnvironment =
flinkEnvironment.getBatchTableEnvironment();
if (!TableUtil.tableExists(tableEnvironment, name)) {
- if (config.hasPath("field_name")) {
- String fieldName = config.getString("field_name");
+ if (sourceConfig.hasPath("field_name")) {
+ String fieldName = sourceConfig.getString("field_name");
tableEnvironment.registerDataSet(name, dataSet, fieldName);
} else {
tableEnvironment.registerDataSet(name, dataSet);
@@ -104,14 +97,13 @@ public class FlinkBatchExecution implements
Execution<FlinkBatchSource, FlinkBat
}
}
- private DataSet fromSourceTable(Plugin plugin) {
- Config config = plugin.getConfig();
- if (config.hasPath(SOURCE_TABLE_NAME)) {
+ private Optional<DataSet<Row>> fromSourceTable(Config pluginConfig) {
+ if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
BatchTableEnvironment tableEnvironment =
flinkEnvironment.getBatchTableEnvironment();
- Table table =
tableEnvironment.scan(config.getString(SOURCE_TABLE_NAME));
- return TableUtil.tableToDataSet(tableEnvironment, table);
+ Table table =
tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
+ return
Optional.ofNullable(TableUtil.tableToDataSet(tableEnvironment, table));
}
- return null;
+ return Optional.empty();
}
@Override
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSink.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSink.java
index f59c09e..f09abcb 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSink.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSink.java
@@ -22,14 +22,15 @@ import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.types.Row;
import javax.annotation.Nullable;
/**
* a FlinkBatchSink plugin will write data to other system using Flink DataSet
API.
*/
-public interface FlinkBatchSink<IN, OUT> extends BaseFlinkSink {
+public interface FlinkBatchSink extends BaseFlinkSink {
@Nullable
- DataSink<OUT> outputBatch(FlinkEnvironment env, DataSet<IN> inDataSet);
+ DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet);
}
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSource.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSource.java
index 9d71f10..6047236 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSource.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSource.java
@@ -21,11 +21,12 @@ import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.types.Row;
/**
* a FlinkBatchSource plugin will read data from other system using Flink
DataSet API.
*/
-public interface FlinkBatchSource<T> extends BaseFlinkSource {
+public interface FlinkBatchSource extends BaseFlinkSource<DataSet<Row>> {
- DataSet<T> getData(FlinkEnvironment env);
+ DataSet<Row> getData(FlinkEnvironment env);
}
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchTransform.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchTransform.java
index 3a81c66..ba63722 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchTransform.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchTransform.java
@@ -21,12 +21,13 @@ import org.apache.seatunnel.flink.BaseFlinkTransform;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.types.Row;
/**
* a FlinkBatchTransform plugin will do transformations to Flink DataSet.
*/
-public interface FlinkBatchTransform<IN, OUT> extends BaseFlinkTransform {
+public interface FlinkBatchTransform extends BaseFlinkTransform {
- DataSet<OUT> processBatch(FlinkEnvironment env, DataSet<IN> data);
+ DataSet<Row> processBatch(FlinkEnvironment env, DataSet<Row> data);
}
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index e3a647b..753050e 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -28,12 +28,13 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
+import java.util.Optional;
public class FlinkStreamExecution implements Execution<FlinkStreamSource,
FlinkStreamTransform, FlinkStreamSink> {
@@ -41,7 +42,7 @@ public class FlinkStreamExecution implements
Execution<FlinkStreamSource, FlinkS
private Config config;
- private FlinkEnvironment flinkEnvironment;
+ private final FlinkEnvironment flinkEnvironment;
public FlinkStreamExecution(FlinkEnvironment streamEnvironment) {
this.flinkEnvironment = streamEnvironment;
@@ -49,31 +50,25 @@ public class FlinkStreamExecution implements
Execution<FlinkStreamSource, FlinkS
@Override
public void start(List<FlinkStreamSource> sources,
List<FlinkStreamTransform> transforms, List<FlinkStreamSink> sinks) throws
Exception {
- List<DataStream> data = new ArrayList<>();
+ List<DataStream<Row>> data = new ArrayList<>();
for (FlinkStreamSource source : sources) {
- DataStream dataStream = source.getData(flinkEnvironment);
+ DataStream<Row> dataStream = source.getData(flinkEnvironment);
data.add(dataStream);
registerResultTable(source, dataStream);
}
- DataStream input = data.get(0);
+ DataStream<Row> input = data.get(0);
for (FlinkStreamTransform transform : transforms) {
- DataStream stream = fromSourceTable(transform);
- if (Objects.isNull(stream)) {
- stream = input;
- }
+ DataStream<Row> stream =
fromSourceTable(transform.getConfig()).orElse(input);
input = transform.processStream(flinkEnvironment, stream);
registerResultTable(transform, input);
transform.registerFunction(flinkEnvironment);
}
for (FlinkStreamSink sink : sinks) {
- DataStream stream = fromSourceTable(sink);
- if (Objects.isNull(stream)) {
- stream = input;
- }
+ DataStream<Row> stream =
fromSourceTable(sink.getConfig()).orElse(input);
sink.outputStream(flinkEnvironment, stream);
}
try {
@@ -101,14 +96,13 @@ public class FlinkStreamExecution implements
Execution<FlinkStreamSource, FlinkS
}
}
- private DataStream fromSourceTable(Plugin plugin) {
- Config config = plugin.getConfig();
- if (config.hasPath(SOURCE_TABLE_NAME)) {
+ private Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
+ if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
StreamTableEnvironment tableEnvironment =
flinkEnvironment.getStreamTableEnvironment();
- Table table =
tableEnvironment.scan(config.getString(SOURCE_TABLE_NAME));
- return TableUtil.tableToDataStream(tableEnvironment, table, true);
+ Table table =
tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
+ return
Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
}
- return null;
+ return Optional.empty();
}
@Override
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java
index ecb768d..b83e550 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java
@@ -22,15 +22,16 @@ import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.types.Row;
import javax.annotation.Nullable;
/**
* a FlinkStreamSink plugin will write data to other system using Flink
DataStream API.
*/
-public interface FlinkStreamSink<IN, OUT> extends BaseFlinkSink {
+public interface FlinkStreamSink extends BaseFlinkSink {
@Nullable
- DataStreamSink<OUT> outputStream(FlinkEnvironment env, DataStream<IN>
dataStream);
+ DataStreamSink<Row> outputStream(FlinkEnvironment env, DataStream<Row>
dataStream);
}
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSource.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSource.java
index c5b9156..096b665 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSource.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSource.java
@@ -21,12 +21,12 @@ import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
/**
* a FlinkStreamSource plugin will read data from other system using Flink
DataStream API.
*/
-public interface FlinkStreamSource<T> extends BaseFlinkSource {
-
- DataStream<T> getData(FlinkEnvironment env);
+public interface FlinkStreamSource extends BaseFlinkSource<DataStream<Row>> {
+ DataStream<Row> getData(FlinkEnvironment env);
}
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamTransform.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamTransform.java
index 3df88aa..55d9e2d 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamTransform.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamTransform.java
@@ -21,11 +21,12 @@ import org.apache.seatunnel.flink.BaseFlinkTransform;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
/**
* a FlinkBatchTransform plugin will do transformations to Flink DataStream.
*/
-public interface FlinkStreamTransform<IN, OUT> extends BaseFlinkTransform {
+public interface FlinkStreamTransform extends BaseFlinkTransform {
- DataStream<OUT> processStream(FlinkEnvironment env, DataStream<IN>
dataStream);
+ DataStream<Row> processStream(FlinkEnvironment env, DataStream<Row>
dataStream);
}
diff --git a/seatunnel-config/seatunnel-config-base/pom.xml
b/seatunnel-config/seatunnel-config-base/pom.xml
index 8d1a8b7..bac9f61 100644
--- a/seatunnel-config/seatunnel-config-base/pom.xml
+++ b/seatunnel-config/seatunnel-config-base/pom.xml
@@ -80,6 +80,7 @@
<exclude>com/typesafe/config/impl/ConfigNodePath.class</exclude>
<exclude>com/typesafe/config/impl/PathParser.class</exclude>
<exclude>com/typesafe/config/impl/Path.class</exclude>
+
<exclude>com/typesafe/config/impl/SimpleConfigObject.class</exclude>
</excludes>
</filter>
</filters>
diff --git
a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java
b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java
new file mode 100644
index 0000000..ed52f4b
--- /dev/null
+++
b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java
@@ -0,0 +1,568 @@
+/*
+ * Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
+ */
+
+package org.apache.seatunnel.shade.com.typesafe.config.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigOrigin;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+final class SimpleConfigObject extends AbstractConfigObject implements
Serializable {
+ private static final long serialVersionUID = 2L;
+ private final Map<String, AbstractConfigValue> value;
+ private final boolean resolved;
+ private final boolean ignoresFallbacks;
+ private static final SimpleConfigObject EMPTY_INSTANCE =
empty(SimpleConfigOrigin.newSimple("empty config"));
+ private static final int HASH_CODE = 41;
+
+ SimpleConfigObject(ConfigOrigin origin, Map<String, AbstractConfigValue>
value, ResolveStatus status, boolean ignoresFallbacks) {
+ super(origin);
+ if (value == null) {
+ throw new ConfigException.BugOrBroken("creating config object with
null map");
+ } else {
+ this.value = value;
+ this.resolved = status == ResolveStatus.RESOLVED;
+ this.ignoresFallbacks = ignoresFallbacks;
+ if (status != ResolveStatus.fromValues(value.values())) {
+ throw new ConfigException.BugOrBroken("Wrong resolved status
on " + this);
+ }
+ }
+ }
+
+ SimpleConfigObject(ConfigOrigin origin, Map<String, AbstractConfigValue>
value) {
+ this(origin, value, ResolveStatus.fromValues(value.values()), false);
+ }
+
+ public SimpleConfigObject withOnlyKey(String key) {
+ return this.withOnlyPath(Path.newKey(key));
+ }
+
+ public SimpleConfigObject withoutKey(String key) {
+ return this.withoutPath(Path.newKey(key));
+ }
+
+ protected SimpleConfigObject withOnlyPathOrNull(Path path) {
+ String key = path.first();
+ Path next = path.remainder();
+ AbstractConfigValue v = this.value.get(key);
+ if (next != null) {
+ if (v instanceof AbstractConfigObject) {
+ v = ((AbstractConfigObject) v).withOnlyPathOrNull(next);
+ } else {
+ v = null;
+ }
+ }
+
+ return v == null ? null : new SimpleConfigObject(this.origin(),
Collections.singletonMap(key, v), v.resolveStatus(), this.ignoresFallbacks);
+ }
+
+ SimpleConfigObject withOnlyPath(Path path) {
+ SimpleConfigObject o = this.withOnlyPathOrNull(path);
+ return o == null ? new SimpleConfigObject(this.origin(),
Collections.emptyMap(), ResolveStatus.RESOLVED, this.ignoresFallbacks) : o;
+ }
+
+ SimpleConfigObject withoutPath(Path path) {
+ String key = path.first();
+ Path next = path.remainder();
+ AbstractConfigValue v = this.value.get(key);
+ HashMap<String, AbstractConfigValue> smaller;
+ if (next != null && v instanceof AbstractConfigObject) {
+ v = ((AbstractConfigObject) v).withoutPath(next);
+ smaller = new HashMap<>(this.value);
+ smaller.put(key, v);
+ return new SimpleConfigObject(this.origin(), smaller,
ResolveStatus.fromValues(smaller.values()), this.ignoresFallbacks);
+ } else if (next == null && v != null) {
+ smaller = new HashMap<>(this.value.size() - 1);
+
+ for (Entry<String, AbstractConfigValue>
stringAbstractConfigValueEntry : this.value.entrySet()) {
+ if (!stringAbstractConfigValueEntry.getKey().equals(key)) {
+ smaller.put(stringAbstractConfigValueEntry.getKey(),
stringAbstractConfigValueEntry.getValue());
+ }
+ }
+
+ return new SimpleConfigObject(this.origin(), smaller,
ResolveStatus.fromValues(smaller.values()), this.ignoresFallbacks);
+ } else {
+ return this;
+ }
+ }
+
+ public SimpleConfigObject withValue(String key, ConfigValue v) {
+ if (v == null) {
+ throw new ConfigException.BugOrBroken("Trying to store null
ConfigValue in a ConfigObject");
+ } else {
+ Map newMap;
+ if (this.value.isEmpty()) {
+ newMap = Collections.singletonMap(key, (AbstractConfigValue)
v);
+ } else {
+ newMap = new HashMap<>(this.value);
+ newMap.put(key, v);
+ }
+
+ return new SimpleConfigObject(this.origin(), newMap,
ResolveStatus.fromValues(newMap.values()), this.ignoresFallbacks);
+ }
+ }
+
+ SimpleConfigObject withValue(Path path, ConfigValue v) {
+ String key = path.first();
+ Path next = path.remainder();
+ if (next == null) {
+ return this.withValue(key, v);
+ } else {
+ AbstractConfigValue child = this.value.get(key);
+ if (child instanceof AbstractConfigObject) {
+ return this.withValue(key, ((AbstractConfigObject)
child).withValue(next, v));
+ } else {
+ SimpleConfig subtree = ((AbstractConfigValue)
v).atPath(SimpleConfigOrigin.newSimple("withValue(" + next.render() + ")"),
next);
+ return this.withValue(key, subtree.root());
+ }
+ }
+ }
+
+ protected AbstractConfigValue attemptPeekWithPartialResolve(String key) {
+ return this.value.get(key);
+ }
+
+ private SimpleConfigObject newCopy(ResolveStatus newStatus, ConfigOrigin
newOrigin, boolean newIgnoresFallbacks) {
+ return new SimpleConfigObject(newOrigin, this.value, newStatus,
newIgnoresFallbacks);
+ }
+
+ protected SimpleConfigObject newCopy(ResolveStatus newStatus, ConfigOrigin
newOrigin) {
+ return this.newCopy(newStatus, newOrigin, this.ignoresFallbacks);
+ }
+
+ protected SimpleConfigObject withFallbacksIgnored() {
+ return this.ignoresFallbacks ? this :
this.newCopy(this.resolveStatus(), this.origin(), true);
+ }
+
+ ResolveStatus resolveStatus() {
+ return ResolveStatus.fromBoolean(this.resolved);
+ }
+
+ public SimpleConfigObject replaceChild(AbstractConfigValue child,
AbstractConfigValue replacement) {
+ Map<String, AbstractConfigValue> newChildren = new
HashMap<>(this.value);
+ Iterator<Entry<String, AbstractConfigValue>> var4 =
newChildren.entrySet().iterator();
+
+ Entry<String, AbstractConfigValue> old;
+ do {
+ if (!var4.hasNext()) {
+ throw new
ConfigException.BugOrBroken("SimpleConfigObject.replaceChild did not find " +
child + " in " + this);
+ }
+
+ old = var4.next();
+ } while (old.getValue() != child);
+
+ if (replacement != null) {
+ old.setValue(replacement);
+ } else {
+ newChildren.remove(old.getKey());
+ }
+
+ return new SimpleConfigObject(this.origin(), newChildren,
ResolveStatus.fromValues(newChildren.values()), this.ignoresFallbacks);
+ }
+
+ public boolean hasDescendant(AbstractConfigValue descendant) {
+ Iterator<AbstractConfigValue> var2 = this.value.values().iterator();
+
+ AbstractConfigValue child;
+ do {
+ if (!var2.hasNext()) {
+ var2 = this.value.values().iterator();
+
+ do {
+ if (!var2.hasNext()) {
+ return false;
+ }
+
+ child = var2.next();
+ } while (!(child instanceof Container) || !((Container)
child).hasDescendant(descendant));
+
+ return true;
+ }
+
+ child = var2.next();
+ } while (child != descendant);
+
+ return true;
+ }
+
+ protected boolean ignoresFallbacks() {
+ return this.ignoresFallbacks;
+ }
+
+ public Map<String, Object> unwrapped() {
+ Map<String, Object> m = new HashMap<>();
+
+ for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry
: this.value.entrySet()) {
+ m.put(stringAbstractConfigValueEntry.getKey(),
stringAbstractConfigValueEntry.getValue().unwrapped());
+ }
+
+ return m;
+ }
+
+ protected SimpleConfigObject mergedWithObject(AbstractConfigObject
abstractFallback) {
+ this.requireNotIgnoringFallbacks();
+ if (!(abstractFallback instanceof SimpleConfigObject)) {
+ throw new ConfigException.BugOrBroken("should not be reached
(merging non-SimpleConfigObject)");
+ } else {
+ SimpleConfigObject fallback = (SimpleConfigObject)
abstractFallback;
+ boolean changed = false;
+ boolean allResolved = true;
+ Map<String, AbstractConfigValue> merged = new HashMap<>();
+ Set<String> allKeys = new HashSet<>();
+ allKeys.addAll(this.keySet());
+ allKeys.addAll(fallback.keySet());
+
+ for (String key : allKeys) {
+ AbstractConfigValue first = this.value.get(key);
+ AbstractConfigValue second = fallback.value.get(key);
+ AbstractConfigValue kept;
+ if (first == null) {
+ kept = second;
+ } else if (second == null) {
+ kept = first;
+ } else {
+ kept = first.withFallback(second);
+ }
+
+ merged.put(key, kept);
+ if (first != kept) {
+ changed = true;
+ }
+
+ if (kept.resolveStatus() == ResolveStatus.UNRESOLVED) {
+ allResolved = false;
+ }
+ }
+
+ ResolveStatus newResolveStatus =
ResolveStatus.fromBoolean(allResolved);
+ boolean newIgnoresFallbacks = fallback.ignoresFallbacks();
+ if (changed) {
+ return new SimpleConfigObject(mergeOrigins(this, fallback),
merged, newResolveStatus, newIgnoresFallbacks);
+ } else if (newResolveStatus == this.resolveStatus() &&
newIgnoresFallbacks == this.ignoresFallbacks()) {
+ return this;
+ } else {
+ return this.newCopy(newResolveStatus, this.origin(),
newIgnoresFallbacks);
+ }
+ }
+ }
+
+ private SimpleConfigObject modify(NoExceptionsModifier modifier) {
+ try {
+ return this.modifyMayThrow(modifier);
+ } catch (RuntimeException var3) {
+ throw var3;
+ } catch (Exception var4) {
+ throw new ConfigException.BugOrBroken("unexpected checked
exception", var4);
+ }
+ }
+
+ private SimpleConfigObject modifyMayThrow(Modifier modifier) throws
Exception {
+ Map<String, AbstractConfigValue> changes = null;
+
+ for (String k : this.keySet()) {
+ AbstractConfigValue v = this.value.get(k);
+ AbstractConfigValue modified = modifier.modifyChildMayThrow(k, v);
+ if (modified != v) {
+ if (changes == null) {
+ changes = new HashMap<>();
+ }
+
+ changes.put(k, modified);
+ }
+ }
+
+ if (changes == null) {
+ return this;
+ } else {
+ Map<String, AbstractConfigValue> modified = new HashMap<>();
+ boolean sawUnresolved = false;
+
+ for (String k : this.keySet()) {
+ AbstractConfigValue newValue;
+ if (changes.containsKey(k)) {
+ newValue = changes.get(k);
+ if (newValue != null) {
+ modified.put(k, newValue);
+ if (newValue.resolveStatus() ==
ResolveStatus.UNRESOLVED) {
+ sawUnresolved = true;
+ }
+ }
+ } else {
+ newValue = this.value.get(k);
+ modified.put(k, newValue);
+ if (newValue.resolveStatus() == ResolveStatus.UNRESOLVED) {
+ sawUnresolved = true;
+ }
+ }
+ }
+
+ return new SimpleConfigObject(this.origin(), modified,
sawUnresolved ? ResolveStatus.UNRESOLVED : ResolveStatus.RESOLVED,
this.ignoresFallbacks());
+ }
+ }
+
+ ResolveResult<? extends AbstractConfigObject>
resolveSubstitutions(ResolveContext context, ResolveSource source) throws
NotPossibleToResolve {
+ if (this.resolveStatus() == ResolveStatus.RESOLVED) {
+ return ResolveResult.make(context, this);
+ } else {
+ ResolveSource sourceWithParent = source.pushParent(this);
+
+ try {
+ SimpleConfigObject.ResolveModifier modifier = new
SimpleConfigObject.ResolveModifier(context, sourceWithParent);
+ AbstractConfigValue value = this.modifyMayThrow(modifier);
+ return ResolveResult.make(modifier.context,
value).asObjectResult();
+ } catch (NotPossibleToResolve | RuntimeException var6) {
+ throw var6;
+ } catch (Exception var8) {
+ throw new ConfigException.BugOrBroken("unexpected checked
exception", var8);
+ }
+ }
+ }
+
+ SimpleConfigObject relativized(final Path prefix) {
+ return this.modify(new NoExceptionsModifier() {
+ public AbstractConfigValue modifyChild(String key,
AbstractConfigValue v) {
+ return v.relativized(prefix);
+ }
+ });
+ }
+
+ protected void render(StringBuilder sb, int indent, boolean atRoot,
ConfigRenderOptions options) {
+ if (this.isEmpty()) {
+ sb.append("{}");
+ } else {
+ boolean outerBraces = options.getJson() || !atRoot;
+ int innerIndent;
+ if (outerBraces) {
+ innerIndent = indent + 1;
+ sb.append("{");
+ if (options.getFormatted()) {
+ sb.append('\n');
+ }
+ } else {
+ innerIndent = indent;
+ }
+
+ int separatorCount = 0;
+ String[] keys = this.keySet().toArray(new String[0]);
+
+ for (String k : keys) {
+ AbstractConfigValue v = this.value.get(k);
+ if (options.getOriginComments()) {
+ String[] lines = v.origin().description().split("\n");
+
+ for (String l : lines) {
+ indent(sb, indent + 1, options);
+ sb.append('#');
+ if (!l.isEmpty()) {
+ sb.append(' ');
+ }
+
+ sb.append(l);
+ sb.append("\n");
+ }
+ }
+
+ if (options.getComments()) {
+
+ for (String comment : v.origin().comments()) {
+ indent(sb, innerIndent, options);
+ sb.append("#");
+ if (!comment.startsWith(" ")) {
+ sb.append(' ');
+ }
+
+ sb.append(comment);
+ sb.append("\n");
+ }
+ }
+
+ indent(sb, innerIndent, options);
+ v.render(sb, innerIndent, false, k, options);
+ if (options.getFormatted()) {
+ if (options.getJson()) {
+ sb.append(",");
+ separatorCount = 2;
+ } else {
+ separatorCount = 1;
+ }
+
+ sb.append('\n');
+ } else {
+ sb.append(",");
+ separatorCount = 1;
+ }
+ }
+
+ sb.setLength(sb.length() - separatorCount);
+ if (outerBraces) {
+ if (options.getFormatted()) {
+ sb.append('\n');
+ indent(sb, indent, options);
+ }
+
+ sb.append("}");
+ }
+ }
+
+ if (atRoot && options.getFormatted()) {
+ sb.append('\n');
+ }
+
+ }
+
+ public AbstractConfigValue get(Object key) {
+ return this.value.get(key);
+ }
+
+ private static boolean mapEquals(Map<String, ConfigValue> a, Map<String,
ConfigValue> b) {
+ if (a == b) {
+ return true;
+ } else {
+ Set<String> aKeys = a.keySet();
+ Set<String> bKeys = b.keySet();
+ if (aKeys.equals(bKeys)) {
+ Iterator<String> var4 = aKeys.iterator();
+
+ String key;
+ do {
+ if (!var4.hasNext()) {
+ return true;
+ }
+
+ key = var4.next();
+ } while (a.get(key).equals(b.get(key)));
+
+ }
+ return false;
+ }
+ }
+
+ @SuppressWarnings("magicnumber")
+ private static int mapHash(Map<String, ConfigValue> m) {
+ List<String> keys = new ArrayList<>(m.keySet());
+ Collections.sort(keys);
+ int valuesHash = 0;
+
+ String k;
+ for (Iterator<String> var3 = keys.iterator(); var3.hasNext();
valuesHash += m.get(k).hashCode()) {
+ k = var3.next();
+ }
+
+ return HASH_CODE * (HASH_CODE + keys.hashCode()) + valuesHash;
+ }
+
+ protected boolean canEqual(Object other) {
+ return other instanceof ConfigObject;
+ }
+
+ public boolean equals(Object other) {
+ if (!(other instanceof ConfigObject)) {
+ return false;
+ } else {
+ return this.canEqual(other) && mapEquals(this, (ConfigObject)
other);
+ }
+ }
+
+ public int hashCode() {
+ return mapHash(this);
+ }
+
+ public boolean containsKey(Object key) {
+ return this.value.containsKey(key);
+ }
+
+ public Set<String> keySet() {
+ return this.value.keySet();
+ }
+
+ public boolean containsValue(Object v) {
+ return this.value.containsValue(v);
+ }
+
+ public Set<Entry<String, ConfigValue>> entrySet() {
+ HashSet<Entry<String, ConfigValue>> entries = new HashSet<>();
+
+ for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry
: this.value.entrySet()) {
+ entries.add(new
AbstractMap.SimpleImmutableEntry<>(stringAbstractConfigValueEntry.getKey(),
stringAbstractConfigValueEntry.getValue()));
+ }
+
+ return entries;
+ }
+
+ public boolean isEmpty() {
+ return this.value.isEmpty();
+ }
+
+ public int size() {
+ return this.value.size();
+ }
+
+ public Collection<ConfigValue> values() {
+ return new HashSet<>(this.value.values());
+ }
+
+ static SimpleConfigObject empty() {
+ return EMPTY_INSTANCE;
+ }
+
+ static SimpleConfigObject empty(ConfigOrigin origin) {
+ return origin == null ? empty() : new SimpleConfigObject(origin,
Collections.emptyMap());
+ }
+
+ static SimpleConfigObject emptyMissing(ConfigOrigin baseOrigin) {
+ return new
SimpleConfigObject(SimpleConfigOrigin.newSimple(baseOrigin.description() + "
(not found)"), Collections.emptyMap());
+ }
+
+ private Object writeReplace() throws ObjectStreamException {
+ return new SerializedConfigValue(this);
+ }
+
+ private static final class ResolveModifier implements Modifier {
+ final Path originalRestrict;
+ ResolveContext context;
+ final ResolveSource source;
+
+ ResolveModifier(ResolveContext context, ResolveSource source) {
+ this.context = context;
+ this.source = source;
+ this.originalRestrict = context.restrictToChild();
+ }
+
+ public AbstractConfigValue modifyChildMayThrow(String key,
AbstractConfigValue v) throws NotPossibleToResolve {
+ if (this.context.isRestrictedToChild()) {
+ if (key.equals(this.context.restrictToChild().first())) {
+ Path remainder =
this.context.restrictToChild().remainder();
+ if (remainder != null) {
+ ResolveResult<? extends AbstractConfigValue> result =
this.context.restrict(remainder).resolve(v, this.source);
+ this.context =
result.context.unrestricted().restrict(this.originalRestrict);
+ return result.value;
+ } else {
+ return v;
+ }
+ } else {
+ return v;
+ }
+ } else {
+ ResolveResult<? extends AbstractConfigValue> result =
this.context.unrestricted().resolve(v, this.source);
+ this.context =
result.context.unrestricted().restrict(this.originalRestrict);
+ return result.value;
+ }
+ }
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
index 422ea43..e57d72e 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
@@ -34,7 +34,7 @@ import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConsoleSink extends RichOutputFormat<Row> implements
FlinkBatchSink<Row, Row>, FlinkStreamSink<Row, Row> {
+public class ConsoleSink extends RichOutputFormat<Row> implements
FlinkBatchSink, FlinkStreamSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsoleSink.class);
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
index 7fe406f..eeb1e87 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
@@ -41,7 +41,7 @@ import javax.annotation.Nullable;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-public class DorisSink implements FlinkStreamSink<Row, Row>,
FlinkBatchSink<Row, Row> {
+public class DorisSink implements FlinkStreamSink, FlinkBatchSink {
private static final long serialVersionUID = 4747849769146047770L;
private static final int DEFAULT_BATCH_SIZE = 100;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
index 82c8f86..aa3746c 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.types.Row;
-public class DruidSink implements FlinkBatchSink<Row, Row> {
+public class DruidSink implements FlinkBatchSink {
private static final long serialVersionUID = -2967782261362988646L;
private static final String COORDINATOR_URL = "coordinator_url";
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
index b271ff0..afa5eb6 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
@@ -52,7 +52,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
-public class DruidSource implements FlinkBatchSource<Row> {
+public class DruidSource implements FlinkBatchSource {
private static final long serialVersionUID = 8152628883440481281L;
private static final Logger LOGGER =
LoggerFactory.getLogger(DruidSource.class);
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
index be235c0..6965947 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
@@ -55,7 +55,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class Elasticsearch implements FlinkStreamSink<Row, Row>,
FlinkBatchSink<Row, Row> {
+public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink {
private static final long serialVersionUID = 8445868321245456793L;
private static final int DEFAULT_CONFIG_SIZE = 3;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
index 69a2fff..b7915dd 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
@@ -35,7 +35,7 @@ import org.apache.flink.types.Row;
import java.util.concurrent.TimeUnit;
-public class FakeSourceStream extends RichParallelSourceFunction<Row>
implements FlinkStreamSource<Row> {
+public class FakeSourceStream extends RichParallelSourceFunction<Row>
implements FlinkStreamSource {
private static final long serialVersionUID = -3026082767246767679L;
private volatile boolean running = true;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
index 4569343..defaef7 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
import java.io.PrintStream;
-public class FileSink implements FlinkStreamSink<Row, Row>,
FlinkBatchSink<Row, Row> {
+public class FileSink implements FlinkStreamSink, FlinkBatchSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(FileSink.class);
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
index cd1e010..ed80e63 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
@@ -45,7 +45,7 @@ import org.apache.parquet.schema.MessageType;
import java.util.List;
import java.util.Map;
-public class FileSource implements FlinkBatchSource<Row> {
+public class FileSource implements FlinkBatchSource {
private static final long serialVersionUID = -5206798549756998426L;
private static final int DEFAULT_BATCH_SIZE = 1000;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
index 82209b4..8f81688 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
@@ -30,7 +30,7 @@ import org.apache.flink.types.Row;
import java.util.List;
-public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
+public class InfluxDbSink implements FlinkBatchSink {
private static final long serialVersionUID = 7358988750295693096L;
private static final String SERVER_URL = "server_url";
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
index a8cf18e..019e7e9 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
@@ -42,7 +42,7 @@ import org.apache.flink.types.Row;
import java.util.HashMap;
import java.util.List;
-public class InfluxDbSource implements FlinkBatchSource<Row> {
+public class InfluxDbSource implements FlinkBatchSource {
private Config config;
private InfluxDbInputFormat influxDbInputFormat;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
index 2a3e615..aaa41fe 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
@@ -52,7 +52,7 @@ import javax.annotation.Nullable;
import java.util.Arrays;
-public class JdbcSink implements FlinkStreamSink<Row, Row>,
FlinkBatchSink<Row, Row> {
+public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
private static final long serialVersionUID = 3677571223952518115L;
private static final int DEFAULT_BATCH_SIZE = 5000;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
index bfd1b7b..e028880 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
@@ -56,7 +56,7 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class JdbcSource implements FlinkBatchSource<Row> {
+public class JdbcSource implements FlinkBatchSource {
private static final long serialVersionUID = -3349505356339446415L;
private static final Logger LOGGER =
LoggerFactory.getLogger(JdbcSource.class);
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/sink/KafkaTable.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/sink/KafkaTable.java
index a8857ac..7e6bf74 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/sink/KafkaTable.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/sink/KafkaTable.java
@@ -42,7 +42,7 @@ import javax.annotation.Nullable;
import java.util.Properties;
-public class KafkaTable implements FlinkStreamSink<Row, Row> {
+public class KafkaTable implements FlinkStreamSink {
private static final long serialVersionUID = 3980751499724935230L;
private Config config;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/source/KafkaTableStream.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/source/KafkaTableStream.java
index a2e2721..42e1fa5 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/source/KafkaTableStream.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/source/KafkaTableStream.java
@@ -46,7 +46,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Properties;
-public class KafkaTableStream implements FlinkStreamSource<Row> {
+public class KafkaTableStream implements FlinkStreamSource {
private static final long serialVersionUID = 5287018194573371428L;
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaTableStream.class);
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
index ae37a3e..549bf8d 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
-public class SocketStream implements FlinkStreamSource<Row> {
+public class SocketStream implements FlinkStreamSource {
private static final long serialVersionUID = 986629276153771291L;
private Config config;
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
index d112515..261e4dd 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.command.FlinkCommandArgs;
import org.apache.seatunnel.config.ConfigBuilder;
import org.apache.seatunnel.config.PluginType;
import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.env.RuntimeEnv;
import java.util.List;
@@ -37,9 +38,9 @@ public class FlinkTaskExecuteCommand extends
BaseTaskExecuteCommand<FlinkCommand
public void execute(FlinkCommandArgs flinkCommandArgs) {
ConfigBuilder configBuilder = new
ConfigBuilder(flinkCommandArgs.getConfigFile(),
flinkCommandArgs.getEngineType());
- List<BaseSource> sources =
configBuilder.createPlugins(PluginType.SOURCE);
- List<BaseTransform> transforms =
configBuilder.createPlugins(PluginType.TRANSFORM);
- List<BaseSink> sinks = configBuilder.createPlugins(PluginType.SINK);
+ List<BaseSource<RuntimeEnv>> sources =
configBuilder.createPlugins(PluginType.SOURCE);
+ List<BaseTransform<RuntimeEnv>> transforms =
configBuilder.createPlugins(PluginType.TRANSFORM);
+ List<BaseSink<RuntimeEnv>> sinks =
configBuilder.createPlugins(PluginType.SINK);
Execution execution = configBuilder.createExecution();
baseCheckConfig(sources, transforms, sinks);
prepare(configBuilder.getEnv(), sources, transforms, sinks);
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
index abf3a3e..44e0b53 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.config;
+import org.apache.seatunnel.apis.BaseSink;
+import org.apache.seatunnel.apis.BaseSource;
+import org.apache.seatunnel.apis.BaseTransform;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.env.Execution;
import org.apache.seatunnel.env.RuntimeEnv;
@@ -175,7 +178,7 @@ public class ConfigBuilder {
this.createPlugins(PluginType.SINK);
}
- public <T extends Plugin<?>> List<T> createPlugins(PluginType type) {
+ public <T extends Plugin<? extends RuntimeEnv>> List<T>
createPlugins(PluginType type) {
Objects.requireNonNull(type, "PluginType can not be null when create
plugins!");
List<T> basePluginList = new ArrayList<>();
List<? extends Config> configList =
config.getConfigList(type.getType());
@@ -212,8 +215,14 @@ public class ConfigBuilder {
return env;
}
- public Execution createExecution() {
- Execution execution = null;
+ public Execution<
+ ? extends BaseSource<? extends RuntimeEnv>,
+ ? extends BaseTransform<? extends RuntimeEnv>,
+ ? extends BaseSink<? extends RuntimeEnv>> createExecution() {
+ Execution<
+ ? extends BaseSource<? extends RuntimeEnv>,
+ ? extends BaseTransform<? extends RuntimeEnv>,
+ ? extends BaseSink<? extends RuntimeEnv>> execution = null;
switch (engine) {
case SPARK:
SparkEnvironment sparkEnvironment = (SparkEnvironment) env;
diff --git
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-datastream2table/src/main/java/org/apache/seatunnel/flink/transform/DataStreamToTable.java
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-datastream2table/src/main/java/org/apache/seatunnel/flink/transform/DataStreamToTable.java
index 852ba27..7ce02aa 100644
---
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-datastream2table/src/main/java/org/apache/seatunnel/flink/transform/DataStreamToTable.java
+++
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-datastream2table/src/main/java/org/apache/seatunnel/flink/transform/DataStreamToTable.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
-public class DataStreamToTable implements FlinkStreamTransform<Row, Row>,
FlinkBatchTransform<Row, Row> {
+public class DataStreamToTable implements FlinkStreamTransform,
FlinkBatchTransform {
private static final long serialVersionUID = -7861928245025199286L;
private Config config;
diff --git
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
index 1b498b5..2292967 100644
---
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
+++
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
@@ -34,7 +34,7 @@ import org.apache.flink.types.Row;
import java.util.List;
-public class Split implements FlinkStreamTransform<Row, Row>,
FlinkBatchTransform<Row, Row> {
+public class Split implements FlinkStreamTransform, FlinkBatchTransform {
private Config config;
diff --git
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
index 6364a23..4594fd44 100644
---
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
+++
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
@@ -33,7 +33,7 @@ import
org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
-public class Sql implements FlinkStreamTransform<Row, Row>,
FlinkBatchTransform<Row, Row> {
+public class Sql implements FlinkStreamTransform, FlinkBatchTransform {
private String sql;
diff --git
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-table2datastream/src/main/java/org/apache/seatunnel/flink/transform/TableToDataStream.java
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-table2datastream/src/main/java/org/apache/seatunnel/flink/transform/TableToDataStream.java
index 140745d..a504a38 100644
---
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-table2datastream/src/main/java/org/apache/seatunnel/flink/transform/TableToDataStream.java
+++
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-table2datastream/src/main/java/org/apache/seatunnel/flink/transform/TableToDataStream.java
@@ -34,7 +34,7 @@ import
org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
-public class TableToDataStream implements FlinkStreamTransform<Row, Row>,
FlinkBatchTransform<Row, Row> {
+public class TableToDataStream implements FlinkStreamTransform,
FlinkBatchTransform {
private static final long serialVersionUID = 4556842426965038124L;
private Config config;