This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a65d081  [Improvement][Config] Print config in origin order (#1484)
a65d081 is described below

commit a65d081c915e4e2aee75003b742de55278c7ce5b
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Mar 15 22:58:06 2022 +0800

    [Improvement][Config] Print config in origin order (#1484)
    
    * [Improvement][Config] Print config in origin order
    
    * Optimize generic type
    
    * Add license announce
---
 LICENSE                                            |   3 +-
 .../java/org/apache/seatunnel/env/Execution.java   |   5 +-
 .../apache/seatunnel/flink/BaseFlinkSource.java    |   4 +-
 .../seatunnel/flink/batch/FlinkBatchExecution.java |  48 +-
 .../seatunnel/flink/batch/FlinkBatchSink.java      |   5 +-
 .../seatunnel/flink/batch/FlinkBatchSource.java    |   5 +-
 .../seatunnel/flink/batch/FlinkBatchTransform.java |   5 +-
 .../flink/stream/FlinkStreamExecution.java         |  32 +-
 .../seatunnel/flink/stream/FlinkStreamSink.java    |   5 +-
 .../seatunnel/flink/stream/FlinkStreamSource.java  |   6 +-
 .../flink/stream/FlinkStreamTransform.java         |   5 +-
 seatunnel-config/seatunnel-config-base/pom.xml     |   1 +
 .../typesafe/config/impl/SimpleConfigObject.java   | 568 +++++++++++++++++++++
 .../apache/seatunnel/flink/sink/ConsoleSink.java   |   2 +-
 .../org/apache/seatunnel/flink/sink/DorisSink.java |   2 +-
 .../org/apache/seatunnel/flink/sink/DruidSink.java |   2 +-
 .../apache/seatunnel/flink/source/DruidSource.java |   2 +-
 .../apache/seatunnel/flink/sink/Elasticsearch.java |   2 +-
 .../seatunnel/flink/source/FakeSourceStream.java   |   2 +-
 .../org/apache/seatunnel/flink/sink/FileSink.java  |   2 +-
 .../apache/seatunnel/flink/source/FileSource.java  |   2 +-
 .../apache/seatunnel/flink/sink/InfluxDbSink.java  |   2 +-
 .../seatunnel/flink/source/InfluxDbSource.java     |   2 +-
 .../org/apache/seatunnel/flink/sink/JdbcSink.java  |   2 +-
 .../apache/seatunnel/flink/source/JdbcSource.java  |   2 +-
 .../apache/seatunnel/flink/sink/KafkaTable.java    |   2 +-
 .../seatunnel/flink/source/KafkaTableStream.java   |   2 +-
 .../seatunnel/flink/source/SocketStream.java       |   2 +-
 .../command/flink/FlinkTaskExecuteCommand.java     |   7 +-
 .../org/apache/seatunnel/config/ConfigBuilder.java |  15 +-
 .../flink/transform/DataStreamToTable.java         |   2 +-
 .../apache/seatunnel/flink/transform/Split.java    |   2 +-
 .../org/apache/seatunnel/flink/transform/Sql.java  |   2 +-
 .../flink/transform/TableToDataStream.java         |   2 +-
 34 files changed, 664 insertions(+), 88 deletions(-)

diff --git a/LICENSE b/LICENSE
index c83457a..5334cec 100644
--- a/LICENSE
+++ b/LICENSE
@@ -213,4 +213,5 @@ 
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade
 
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java
      from  https://github.com/lightbend/config
 
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java
              from  https://github.com/lightbend/config
 
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java
        from  https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java
     from https://github.com/lightbend/config
\ No newline at end of file
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java
     from https://github.com/lightbend/config
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java
     from https://github.com/lightbend/config
\ No newline at end of file
diff --git 
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
 
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
index f7aec8f..b6f4167 100644
--- 
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
+++ 
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
@@ -27,7 +27,10 @@ import java.util.List;
 /**
  * the SeaTunnel job's execution context
  */
-public interface Execution<SR extends BaseSource, TF extends BaseTransform, SK 
extends BaseSink> extends Plugin<Void> {
+public interface Execution<
+    SR extends BaseSource<? extends RuntimeEnv>,
+    TF extends BaseTransform<? extends RuntimeEnv>,
+    SK extends BaseSink<? extends RuntimeEnv>> extends Plugin<Void> {
 
     /**
      * start to execute the SeaTunnel job
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java
index 1345eba..d013496 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java
@@ -22,5 +22,7 @@ import org.apache.seatunnel.apis.BaseSource;
 /**
  * a base interface indicates a source plugin running on Flink.
  */
-public interface BaseFlinkSource extends BaseSource<FlinkEnvironment> {
+public interface BaseFlinkSource<OUT> extends BaseSource<FlinkEnvironment> {
+
+    OUT getData(FlinkEnvironment env);
 }
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index 3e27f9b..017f5bd 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.env.Execution;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.util.TableUtil;
-import org.apache.seatunnel.plugin.Plugin;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -29,12 +28,13 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
+import org.apache.flink.types.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
+import java.util.Optional;
 
 public class FlinkBatchExecution implements Execution<FlinkBatchSource, 
FlinkBatchTransform, FlinkBatchSink> {
 
@@ -42,7 +42,7 @@ public class FlinkBatchExecution implements 
Execution<FlinkBatchSource, FlinkBat
 
     private Config config;
 
-    private FlinkEnvironment flinkEnvironment;
+    private final FlinkEnvironment flinkEnvironment;
 
     public FlinkBatchExecution(FlinkEnvironment flinkEnvironment) {
         this.flinkEnvironment = flinkEnvironment;
@@ -50,31 +50,25 @@ public class FlinkBatchExecution implements 
Execution<FlinkBatchSource, FlinkBat
 
     @Override
     public void start(List<FlinkBatchSource> sources, 
List<FlinkBatchTransform> transforms, List<FlinkBatchSink> sinks) throws 
Exception {
-        List<DataSet> data = new ArrayList<>();
+        List<DataSet<Row>> data = new ArrayList<>();
 
         for (FlinkBatchSource source : sources) {
-            DataSet dataSet = source.getData(flinkEnvironment);
+            DataSet<Row> dataSet = source.getData(flinkEnvironment);
             data.add(dataSet);
-            registerResultTable(source, dataSet);
+            registerResultTable(source.getConfig(), dataSet);
         }
 
-        DataSet input = data.get(0);
+        DataSet<Row> input = data.get(0);
 
         for (FlinkBatchTransform transform : transforms) {
-            DataSet dataSet = fromSourceTable(transform);
-            if (Objects.isNull(dataSet)) {
-                dataSet = input;
-            }
+            DataSet<Row> dataSet = 
fromSourceTable(transform.getConfig()).orElse(input);
             input = transform.processBatch(flinkEnvironment, dataSet);
-            registerResultTable(transform, input);
+            registerResultTable(transform.getConfig(), input);
             transform.registerFunction(flinkEnvironment);
         }
 
         for (FlinkBatchSink sink : sinks) {
-            DataSet dataSet = fromSourceTable(sink);
-            if (Objects.isNull(dataSet)) {
-                dataSet = input;
-            }
+            DataSet<Row> dataSet = 
fromSourceTable(sink.getConfig()).orElse(input);
             sink.outputBatch(flinkEnvironment, dataSet);
         }
 
@@ -88,14 +82,13 @@ public class FlinkBatchExecution implements 
Execution<FlinkBatchSource, FlinkBat
         }
     }
 
-    private void registerResultTable(Plugin plugin, DataSet dataSet) {
-        Config config = plugin.getConfig();
-        if (config.hasPath(RESULT_TABLE_NAME)) {
-            String name = config.getString(RESULT_TABLE_NAME);
+    private void registerResultTable(Config sourceConfig, DataSet<Row> 
dataSet) {
+        if (sourceConfig.hasPath(RESULT_TABLE_NAME)) {
+            String name = sourceConfig.getString(RESULT_TABLE_NAME);
             BatchTableEnvironment tableEnvironment = 
flinkEnvironment.getBatchTableEnvironment();
             if (!TableUtil.tableExists(tableEnvironment, name)) {
-                if (config.hasPath("field_name")) {
-                    String fieldName = config.getString("field_name");
+                if (sourceConfig.hasPath("field_name")) {
+                    String fieldName = sourceConfig.getString("field_name");
                     tableEnvironment.registerDataSet(name, dataSet, fieldName);
                 } else {
                     tableEnvironment.registerDataSet(name, dataSet);
@@ -104,14 +97,13 @@ public class FlinkBatchExecution implements 
Execution<FlinkBatchSource, FlinkBat
         }
     }
 
-    private DataSet fromSourceTable(Plugin plugin) {
-        Config config = plugin.getConfig();
-        if (config.hasPath(SOURCE_TABLE_NAME)) {
+    private Optional<DataSet<Row>> fromSourceTable(Config pluginConfig) {
+        if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
             BatchTableEnvironment tableEnvironment = 
flinkEnvironment.getBatchTableEnvironment();
-            Table table = 
tableEnvironment.scan(config.getString(SOURCE_TABLE_NAME));
-            return TableUtil.tableToDataSet(tableEnvironment, table);
+            Table table = 
tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
+            return 
Optional.ofNullable(TableUtil.tableToDataSet(tableEnvironment, table));
         }
-        return null;
+        return Optional.empty();
     }
 
     @Override
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSink.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSink.java
index f59c09e..f09abcb 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSink.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSink.java
@@ -22,14 +22,15 @@ import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.types.Row;
 
 import javax.annotation.Nullable;
 
 /**
  * a FlinkBatchSink plugin will write data to other system using Flink DataSet 
API.
  */
-public interface FlinkBatchSink<IN, OUT> extends BaseFlinkSink {
+public interface FlinkBatchSink extends BaseFlinkSink {
 
     @Nullable
-    DataSink<OUT> outputBatch(FlinkEnvironment env, DataSet<IN> inDataSet);
+    DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet);
 }
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSource.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSource.java
index 9d71f10..6047236 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSource.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSource.java
@@ -21,11 +21,12 @@ import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.types.Row;
 
 /**
  * a FlinkBatchSource plugin will read data from other system using Flink 
DataSet API.
  */
-public interface FlinkBatchSource<T> extends BaseFlinkSource {
+public interface FlinkBatchSource extends BaseFlinkSource<DataSet<Row>> {
 
-    DataSet<T> getData(FlinkEnvironment env);
+    DataSet<Row> getData(FlinkEnvironment env);
 }
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchTransform.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchTransform.java
index 3a81c66..ba63722 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchTransform.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchTransform.java
@@ -21,12 +21,13 @@ import org.apache.seatunnel.flink.BaseFlinkTransform;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.types.Row;
 
 /**
  * a FlinkBatchTransform plugin will do transformations to Flink DataSet.
  */
-public interface FlinkBatchTransform<IN, OUT> extends BaseFlinkTransform {
+public interface FlinkBatchTransform extends BaseFlinkTransform {
 
-    DataSet<OUT> processBatch(FlinkEnvironment env, DataSet<IN> data);
+    DataSet<Row> processBatch(FlinkEnvironment env, DataSet<Row> data);
 
 }
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index e3a647b..753050e 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -28,12 +28,13 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
+import java.util.Optional;
 
 public class FlinkStreamExecution implements Execution<FlinkStreamSource, 
FlinkStreamTransform, FlinkStreamSink> {
 
@@ -41,7 +42,7 @@ public class FlinkStreamExecution implements 
Execution<FlinkStreamSource, FlinkS
 
     private Config config;
 
-    private FlinkEnvironment flinkEnvironment;
+    private final FlinkEnvironment flinkEnvironment;
 
     public FlinkStreamExecution(FlinkEnvironment streamEnvironment) {
         this.flinkEnvironment = streamEnvironment;
@@ -49,31 +50,25 @@ public class FlinkStreamExecution implements 
Execution<FlinkStreamSource, FlinkS
 
     @Override
     public void start(List<FlinkStreamSource> sources, 
List<FlinkStreamTransform> transforms, List<FlinkStreamSink> sinks) throws 
Exception {
-        List<DataStream> data = new ArrayList<>();
+        List<DataStream<Row>> data = new ArrayList<>();
 
         for (FlinkStreamSource source : sources) {
-            DataStream dataStream = source.getData(flinkEnvironment);
+            DataStream<Row> dataStream = source.getData(flinkEnvironment);
             data.add(dataStream);
             registerResultTable(source, dataStream);
         }
 
-        DataStream input = data.get(0);
+        DataStream<Row> input = data.get(0);
 
         for (FlinkStreamTransform transform : transforms) {
-            DataStream stream = fromSourceTable(transform);
-            if (Objects.isNull(stream)) {
-                stream = input;
-            }
+            DataStream<Row> stream = 
fromSourceTable(transform.getConfig()).orElse(input);
             input = transform.processStream(flinkEnvironment, stream);
             registerResultTable(transform, input);
             transform.registerFunction(flinkEnvironment);
         }
 
         for (FlinkStreamSink sink : sinks) {
-            DataStream stream = fromSourceTable(sink);
-            if (Objects.isNull(stream)) {
-                stream = input;
-            }
+            DataStream<Row> stream = 
fromSourceTable(sink.getConfig()).orElse(input);
             sink.outputStream(flinkEnvironment, stream);
         }
         try {
@@ -101,14 +96,13 @@ public class FlinkStreamExecution implements 
Execution<FlinkStreamSource, FlinkS
         }
     }
 
-    private DataStream fromSourceTable(Plugin plugin) {
-        Config config = plugin.getConfig();
-        if (config.hasPath(SOURCE_TABLE_NAME)) {
+    private Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
+        if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
             StreamTableEnvironment tableEnvironment = 
flinkEnvironment.getStreamTableEnvironment();
-            Table table = 
tableEnvironment.scan(config.getString(SOURCE_TABLE_NAME));
-            return TableUtil.tableToDataStream(tableEnvironment, table, true);
+            Table table = 
tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
+            return 
Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
         }
-        return null;
+        return Optional.empty();
     }
 
     @Override
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java
index ecb768d..b83e550 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java
@@ -22,15 +22,16 @@ import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.types.Row;
 
 import javax.annotation.Nullable;
 
 /**
  * a FlinkStreamSink plugin will write data to other system using Flink 
DataStream API.
  */
-public interface FlinkStreamSink<IN, OUT> extends BaseFlinkSink {
+public interface FlinkStreamSink extends BaseFlinkSink {
 
     @Nullable
-    DataStreamSink<OUT> outputStream(FlinkEnvironment env, DataStream<IN> 
dataStream);
+    DataStreamSink<Row> outputStream(FlinkEnvironment env, DataStream<Row> 
dataStream);
 
 }
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSource.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSource.java
index c5b9156..096b665 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSource.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSource.java
@@ -21,12 +21,12 @@ import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
 
 /**
  * a FlinkStreamSource plugin will read data from other system using Flink 
DataStream API.
  */
-public interface FlinkStreamSource<T> extends BaseFlinkSource {
-
-    DataStream<T> getData(FlinkEnvironment env);
+public interface FlinkStreamSource extends BaseFlinkSource<DataStream<Row>> {
 
+    DataStream<Row> getData(FlinkEnvironment env);
 }
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamTransform.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamTransform.java
index 3df88aa..55d9e2d 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamTransform.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamTransform.java
@@ -21,11 +21,12 @@ import org.apache.seatunnel.flink.BaseFlinkTransform;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
 
 /**
  * a FlinkBatchTransform plugin will do transformations to Flink DataStream.
  */
-public interface FlinkStreamTransform<IN, OUT> extends BaseFlinkTransform {
+public interface FlinkStreamTransform extends BaseFlinkTransform {
 
-    DataStream<OUT> processStream(FlinkEnvironment env, DataStream<IN> 
dataStream);
+    DataStream<Row> processStream(FlinkEnvironment env, DataStream<Row> 
dataStream);
 }
diff --git a/seatunnel-config/seatunnel-config-base/pom.xml 
b/seatunnel-config/seatunnel-config-base/pom.xml
index 8d1a8b7..bac9f61 100644
--- a/seatunnel-config/seatunnel-config-base/pom.xml
+++ b/seatunnel-config/seatunnel-config-base/pom.xml
@@ -80,6 +80,7 @@
                                 
<exclude>com/typesafe/config/impl/ConfigNodePath.class</exclude>
                                 
<exclude>com/typesafe/config/impl/PathParser.class</exclude>
                                 
<exclude>com/typesafe/config/impl/Path.class</exclude>
+                                
<exclude>com/typesafe/config/impl/SimpleConfigObject.class</exclude>
                             </excludes>
                         </filter>
                     </filters>
diff --git 
a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java
 
b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java
new file mode 100644
index 0000000..ed52f4b
--- /dev/null
+++ 
b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java
@@ -0,0 +1,568 @@
+/*
+ *   Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
+ */
+
+package org.apache.seatunnel.shade.com.typesafe.config.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigOrigin;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+final class SimpleConfigObject extends AbstractConfigObject implements 
Serializable {
+    private static final long serialVersionUID = 2L;
+    private final Map<String, AbstractConfigValue> value;
+    private final boolean resolved;
+    private final boolean ignoresFallbacks;
+    private static final SimpleConfigObject EMPTY_INSTANCE = 
empty(SimpleConfigOrigin.newSimple("empty config"));
+    private static final int HASH_CODE = 41;
+
+    SimpleConfigObject(ConfigOrigin origin, Map<String, AbstractConfigValue> 
value, ResolveStatus status, boolean ignoresFallbacks) {
+        super(origin);
+        if (value == null) {
+            throw new ConfigException.BugOrBroken("creating config object with 
null map");
+        } else {
+            this.value = value;
+            this.resolved = status == ResolveStatus.RESOLVED;
+            this.ignoresFallbacks = ignoresFallbacks;
+            if (status != ResolveStatus.fromValues(value.values())) {
+                throw new ConfigException.BugOrBroken("Wrong resolved status 
on " + this);
+            }
+        }
+    }
+
+    SimpleConfigObject(ConfigOrigin origin, Map<String, AbstractConfigValue> 
value) {
+        this(origin, value, ResolveStatus.fromValues(value.values()), false);
+    }
+
+    public SimpleConfigObject withOnlyKey(String key) {
+        return this.withOnlyPath(Path.newKey(key));
+    }
+
+    public SimpleConfigObject withoutKey(String key) {
+        return this.withoutPath(Path.newKey(key));
+    }
+
+    protected SimpleConfigObject withOnlyPathOrNull(Path path) {
+        String key = path.first();
+        Path next = path.remainder();
+        AbstractConfigValue v = this.value.get(key);
+        if (next != null) {
+            if (v instanceof AbstractConfigObject) {
+                v = ((AbstractConfigObject) v).withOnlyPathOrNull(next);
+            } else {
+                v = null;
+            }
+        }
+
+        return v == null ? null : new SimpleConfigObject(this.origin(), 
Collections.singletonMap(key, v), v.resolveStatus(), this.ignoresFallbacks);
+    }
+
+    SimpleConfigObject withOnlyPath(Path path) {
+        SimpleConfigObject o = this.withOnlyPathOrNull(path);
+        return o == null ? new SimpleConfigObject(this.origin(), 
Collections.emptyMap(), ResolveStatus.RESOLVED, this.ignoresFallbacks) : o;
+    }
+
+    SimpleConfigObject withoutPath(Path path) {
+        String key = path.first();
+        Path next = path.remainder();
+        AbstractConfigValue v = this.value.get(key);
+        HashMap<String, AbstractConfigValue> smaller;
+        if (next != null && v instanceof AbstractConfigObject) {
+            v = ((AbstractConfigObject) v).withoutPath(next);
+            smaller = new HashMap<>(this.value);
+            smaller.put(key, v);
+            return new SimpleConfigObject(this.origin(), smaller, 
ResolveStatus.fromValues(smaller.values()), this.ignoresFallbacks);
+        } else if (next == null && v != null) {
+            smaller = new HashMap<>(this.value.size() - 1);
+
+            for (Entry<String, AbstractConfigValue> 
stringAbstractConfigValueEntry : this.value.entrySet()) {
+                if (!stringAbstractConfigValueEntry.getKey().equals(key)) {
+                    smaller.put(stringAbstractConfigValueEntry.getKey(), 
stringAbstractConfigValueEntry.getValue());
+                }
+            }
+
+            return new SimpleConfigObject(this.origin(), smaller, 
ResolveStatus.fromValues(smaller.values()), this.ignoresFallbacks);
+        } else {
+            return this;
+        }
+    }
+
+    public SimpleConfigObject withValue(String key, ConfigValue v) {
+        if (v == null) {
+            throw new ConfigException.BugOrBroken("Trying to store null 
ConfigValue in a ConfigObject");
+        } else {
+            Map newMap;
+            if (this.value.isEmpty()) {
+                newMap = Collections.singletonMap(key, (AbstractConfigValue) 
v);
+            } else {
+                newMap = new HashMap<>(this.value);
+                newMap.put(key, v);
+            }
+
+            return new SimpleConfigObject(this.origin(), newMap, 
ResolveStatus.fromValues(newMap.values()), this.ignoresFallbacks);
+        }
+    }
+
+    SimpleConfigObject withValue(Path path, ConfigValue v) {
+        String key = path.first();
+        Path next = path.remainder();
+        if (next == null) {
+            return this.withValue(key, v);
+        } else {
+            AbstractConfigValue child = this.value.get(key);
+            if (child instanceof AbstractConfigObject) {
+                return this.withValue(key, ((AbstractConfigObject) 
child).withValue(next, v));
+            } else {
+                SimpleConfig subtree = ((AbstractConfigValue) 
v).atPath(SimpleConfigOrigin.newSimple("withValue(" + next.render() + ")"), 
next);
+                return this.withValue(key, subtree.root());
+            }
+        }
+    }
+
+    protected AbstractConfigValue attemptPeekWithPartialResolve(String key) {
+        return this.value.get(key);
+    }
+
+    private SimpleConfigObject newCopy(ResolveStatus newStatus, ConfigOrigin 
newOrigin, boolean newIgnoresFallbacks) {
+        return new SimpleConfigObject(newOrigin, this.value, newStatus, 
newIgnoresFallbacks);
+    }
+
+    protected SimpleConfigObject newCopy(ResolveStatus newStatus, ConfigOrigin 
newOrigin) {
+        return this.newCopy(newStatus, newOrigin, this.ignoresFallbacks);
+    }
+
+    protected SimpleConfigObject withFallbacksIgnored() {
+        return this.ignoresFallbacks ? this : 
this.newCopy(this.resolveStatus(), this.origin(), true);
+    }
+
+    ResolveStatus resolveStatus() {
+        return ResolveStatus.fromBoolean(this.resolved);
+    }
+
+    public SimpleConfigObject replaceChild(AbstractConfigValue child, 
AbstractConfigValue replacement) {
+        Map<String, AbstractConfigValue> newChildren = new 
HashMap<>(this.value);
+        Iterator<Entry<String, AbstractConfigValue>> var4 = 
newChildren.entrySet().iterator();
+
+        Entry<String, AbstractConfigValue> old;
+        do {
+            if (!var4.hasNext()) {
+                throw new 
ConfigException.BugOrBroken("SimpleConfigObject.replaceChild did not find " + 
child + " in " + this);
+            }
+
+            old = var4.next();
+        } while (old.getValue() != child);
+
+        if (replacement != null) {
+            old.setValue(replacement);
+        } else {
+            newChildren.remove(old.getKey());
+        }
+
+        return new SimpleConfigObject(this.origin(), newChildren, 
ResolveStatus.fromValues(newChildren.values()), this.ignoresFallbacks);
+    }
+
+    public boolean hasDescendant(AbstractConfigValue descendant) {
+        Iterator<AbstractConfigValue> var2 = this.value.values().iterator();
+
+        AbstractConfigValue child;
+        do {
+            if (!var2.hasNext()) {
+                var2 = this.value.values().iterator();
+
+                do {
+                    if (!var2.hasNext()) {
+                        return false;
+                    }
+
+                    child = var2.next();
+                } while (!(child instanceof Container) || !((Container) 
child).hasDescendant(descendant));
+
+                return true;
+            }
+
+            child = var2.next();
+        } while (child != descendant);
+
+        return true;
+    }
+
+    protected boolean ignoresFallbacks() {
+        return this.ignoresFallbacks;
+    }
+
+    public Map<String, Object> unwrapped() {
+        Map<String, Object> m = new HashMap<>();
+
+        for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry 
: this.value.entrySet()) {
+            m.put(stringAbstractConfigValueEntry.getKey(), 
stringAbstractConfigValueEntry.getValue().unwrapped());
+        }
+
+        return m;
+    }
+
+    protected SimpleConfigObject mergedWithObject(AbstractConfigObject 
abstractFallback) {
+        this.requireNotIgnoringFallbacks();
+        if (!(abstractFallback instanceof SimpleConfigObject)) {
+            throw new ConfigException.BugOrBroken("should not be reached 
(merging non-SimpleConfigObject)");
+        } else {
+            SimpleConfigObject fallback = (SimpleConfigObject) 
abstractFallback;
+            boolean changed = false;
+            boolean allResolved = true;
+            Map<String, AbstractConfigValue> merged = new HashMap<>();
+            Set<String> allKeys = new HashSet<>();
+            allKeys.addAll(this.keySet());
+            allKeys.addAll(fallback.keySet());
+
+            for (String key : allKeys) {
+                AbstractConfigValue first = this.value.get(key);
+                AbstractConfigValue second = fallback.value.get(key);
+                AbstractConfigValue kept;
+                if (first == null) {
+                    kept = second;
+                } else if (second == null) {
+                    kept = first;
+                } else {
+                    kept = first.withFallback(second);
+                }
+
+                merged.put(key, kept);
+                if (first != kept) {
+                    changed = true;
+                }
+
+                if (kept.resolveStatus() == ResolveStatus.UNRESOLVED) {
+                    allResolved = false;
+                }
+            }
+
+            ResolveStatus newResolveStatus = 
ResolveStatus.fromBoolean(allResolved);
+            boolean newIgnoresFallbacks = fallback.ignoresFallbacks();
+            if (changed) {
+                return new SimpleConfigObject(mergeOrigins(this, fallback), 
merged, newResolveStatus, newIgnoresFallbacks);
+            } else if (newResolveStatus == this.resolveStatus() && 
newIgnoresFallbacks == this.ignoresFallbacks()) {
+                return this;
+            } else {
+                return this.newCopy(newResolveStatus, this.origin(), 
newIgnoresFallbacks);
+            }
+        }
+    }
+
+    private SimpleConfigObject modify(NoExceptionsModifier modifier) {
+        try {
+            return this.modifyMayThrow(modifier);
+        } catch (RuntimeException var3) {
+            throw var3;
+        } catch (Exception var4) {
+            throw new ConfigException.BugOrBroken("unexpected checked 
exception", var4);
+        }
+    }
+
+    private SimpleConfigObject modifyMayThrow(Modifier modifier) throws 
Exception {
+        Map<String, AbstractConfigValue> changes = null;
+
+        for (String k : this.keySet()) {
+            AbstractConfigValue v = this.value.get(k);
+            AbstractConfigValue modified = modifier.modifyChildMayThrow(k, v);
+            if (modified != v) {
+                if (changes == null) {
+                    changes = new HashMap<>();
+                }
+
+                changes.put(k, modified);
+            }
+        }
+
+        if (changes == null) {
+            return this;
+        } else {
+            Map<String, AbstractConfigValue> modified = new HashMap<>();
+            boolean sawUnresolved = false;
+
+            for (String k : this.keySet()) {
+                AbstractConfigValue newValue;
+                if (changes.containsKey(k)) {
+                    newValue = changes.get(k);
+                    if (newValue != null) {
+                        modified.put(k, newValue);
+                        if (newValue.resolveStatus() == 
ResolveStatus.UNRESOLVED) {
+                            sawUnresolved = true;
+                        }
+                    }
+                } else {
+                    newValue = this.value.get(k);
+                    modified.put(k, newValue);
+                    if (newValue.resolveStatus() == ResolveStatus.UNRESOLVED) {
+                        sawUnresolved = true;
+                    }
+                }
+            }
+
+            return new SimpleConfigObject(this.origin(), modified, 
sawUnresolved ? ResolveStatus.UNRESOLVED : ResolveStatus.RESOLVED, 
this.ignoresFallbacks());
+        }
+    }
+
+    ResolveResult<? extends AbstractConfigObject> 
resolveSubstitutions(ResolveContext context, ResolveSource source) throws 
NotPossibleToResolve {
+        if (this.resolveStatus() == ResolveStatus.RESOLVED) {
+            return ResolveResult.make(context, this);
+        } else {
+            ResolveSource sourceWithParent = source.pushParent(this);
+
+            try {
+                SimpleConfigObject.ResolveModifier modifier = new 
SimpleConfigObject.ResolveModifier(context, sourceWithParent);
+                AbstractConfigValue value = this.modifyMayThrow(modifier);
+                return ResolveResult.make(modifier.context, 
value).asObjectResult();
+            } catch (NotPossibleToResolve | RuntimeException var6) {
+                throw var6;
+            } catch (Exception var8) {
+                throw new ConfigException.BugOrBroken("unexpected checked 
exception", var8);
+            }
+        }
+    }
+
+    SimpleConfigObject relativized(final Path prefix) {
+        return this.modify(new NoExceptionsModifier() {
+            public AbstractConfigValue modifyChild(String key, 
AbstractConfigValue v) {
+                return v.relativized(prefix);
+            }
+        });
+    }
+
+    protected void render(StringBuilder sb, int indent, boolean atRoot, 
ConfigRenderOptions options) {
+        if (this.isEmpty()) {
+            sb.append("{}");
+        } else {
+            boolean outerBraces = options.getJson() || !atRoot;
+            int innerIndent;
+            if (outerBraces) {
+                innerIndent = indent + 1;
+                sb.append("{");
+                if (options.getFormatted()) {
+                    sb.append('\n');
+                }
+            } else {
+                innerIndent = indent;
+            }
+
+            int separatorCount = 0;
+            String[] keys = this.keySet().toArray(new String[0]);
+
+            for (String k : keys) {
+                AbstractConfigValue v = this.value.get(k);
+                if (options.getOriginComments()) {
+                    String[] lines = v.origin().description().split("\n");
+
+                    for (String l : lines) {
+                        indent(sb, indent + 1, options);
+                        sb.append('#');
+                        if (!l.isEmpty()) {
+                            sb.append(' ');
+                        }
+
+                        sb.append(l);
+                        sb.append("\n");
+                    }
+                }
+
+                if (options.getComments()) {
+
+                    for (String comment : v.origin().comments()) {
+                        indent(sb, innerIndent, options);
+                        sb.append("#");
+                        if (!comment.startsWith(" ")) {
+                            sb.append(' ');
+                        }
+
+                        sb.append(comment);
+                        sb.append("\n");
+                    }
+                }
+
+                indent(sb, innerIndent, options);
+                v.render(sb, innerIndent, false, k, options);
+                if (options.getFormatted()) {
+                    if (options.getJson()) {
+                        sb.append(",");
+                        separatorCount = 2;
+                    } else {
+                        separatorCount = 1;
+                    }
+
+                    sb.append('\n');
+                } else {
+                    sb.append(",");
+                    separatorCount = 1;
+                }
+            }
+
+            sb.setLength(sb.length() - separatorCount);
+            if (outerBraces) {
+                if (options.getFormatted()) {
+                    sb.append('\n');
+                    indent(sb, indent, options);
+                }
+
+                sb.append("}");
+            }
+        }
+
+        if (atRoot && options.getFormatted()) {
+            sb.append('\n');
+        }
+
+    }
+
+    public AbstractConfigValue get(Object key) {
+        return this.value.get(key);
+    }
+
+    private static boolean mapEquals(Map<String, ConfigValue> a, Map<String, 
ConfigValue> b) {
+        if (a == b) {
+            return true;
+        } else {
+            Set<String> aKeys = a.keySet();
+            Set<String> bKeys = b.keySet();
+            if (aKeys.equals(bKeys)) {
+                Iterator<String> var4 = aKeys.iterator();
+
+                String key;
+                do {
+                    if (!var4.hasNext()) {
+                        return true;
+                    }
+
+                    key = var4.next();
+                } while (a.get(key).equals(b.get(key)));
+
+            }
+            return false;
+        }
+    }
+
+    @SuppressWarnings("magicnumber")
+    private static int mapHash(Map<String, ConfigValue> m) {
+        List<String> keys = new ArrayList<>(m.keySet());
+        Collections.sort(keys);
+        int valuesHash = 0;
+
+        String k;
+        for (Iterator<String> var3 = keys.iterator(); var3.hasNext(); 
valuesHash += m.get(k).hashCode()) {
+            k = var3.next();
+        }
+
+        return HASH_CODE * (HASH_CODE + keys.hashCode()) + valuesHash;
+    }
+
+    protected boolean canEqual(Object other) {
+        return other instanceof ConfigObject;
+    }
+
+    public boolean equals(Object other) {
+        if (!(other instanceof ConfigObject)) {
+            return false;
+        } else {
+            return this.canEqual(other) && mapEquals(this, (ConfigObject) 
other);
+        }
+    }
+
+    public int hashCode() {
+        return mapHash(this);
+    }
+
+    public boolean containsKey(Object key) {
+        return this.value.containsKey(key);
+    }
+
+    public Set<String> keySet() {
+        return this.value.keySet();
+    }
+
+    public boolean containsValue(Object v) {
+        return this.value.containsValue(v);
+    }
+
+    public Set<Entry<String, ConfigValue>> entrySet() {
+        HashSet<Entry<String, ConfigValue>> entries = new HashSet<>();
+
+        for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry 
: this.value.entrySet()) {
+            entries.add(new 
AbstractMap.SimpleImmutableEntry<>(stringAbstractConfigValueEntry.getKey(), 
stringAbstractConfigValueEntry.getValue()));
+        }
+
+        return entries;
+    }
+
+    public boolean isEmpty() {
+        return this.value.isEmpty();
+    }
+
+    public int size() {
+        return this.value.size();
+    }
+
+    public Collection<ConfigValue> values() {
+        return new HashSet<>(this.value.values());
+    }
+
+    static SimpleConfigObject empty() {
+        return EMPTY_INSTANCE;
+    }
+
+    static SimpleConfigObject empty(ConfigOrigin origin) {
+        return origin == null ? empty() : new SimpleConfigObject(origin, 
Collections.emptyMap());
+    }
+
+    static SimpleConfigObject emptyMissing(ConfigOrigin baseOrigin) {
+        return new 
SimpleConfigObject(SimpleConfigOrigin.newSimple(baseOrigin.description() + " 
(not found)"), Collections.emptyMap());
+    }
+
+    private Object writeReplace() throws ObjectStreamException {
+        return new SerializedConfigValue(this);
+    }
+
+    private static final class ResolveModifier implements Modifier {
+        final Path originalRestrict;
+        ResolveContext context;
+        final ResolveSource source;
+
+        ResolveModifier(ResolveContext context, ResolveSource source) {
+            this.context = context;
+            this.source = source;
+            this.originalRestrict = context.restrictToChild();
+        }
+
+        public AbstractConfigValue modifyChildMayThrow(String key, 
AbstractConfigValue v) throws NotPossibleToResolve {
+            if (this.context.isRestrictedToChild()) {
+                if (key.equals(this.context.restrictToChild().first())) {
+                    Path remainder = 
this.context.restrictToChild().remainder();
+                    if (remainder != null) {
+                        ResolveResult<? extends AbstractConfigValue> result = 
this.context.restrict(remainder).resolve(v, this.source);
+                        this.context = 
result.context.unrestricted().restrict(this.originalRestrict);
+                        return result.value;
+                    } else {
+                        return v;
+                    }
+                } else {
+                    return v;
+                }
+            } else {
+                ResolveResult<? extends AbstractConfigValue> result = 
this.context.unrestricted().resolve(v, this.source);
+                this.context = 
result.context.unrestricted().restrict(this.originalRestrict);
+                return result.value;
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
index 422ea43..e57d72e 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
@@ -34,7 +34,7 @@ import org.apache.flink.types.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ConsoleSink extends RichOutputFormat<Row> implements 
FlinkBatchSink<Row, Row>, FlinkStreamSink<Row, Row> {
+public class ConsoleSink extends RichOutputFormat<Row> implements 
FlinkBatchSink, FlinkStreamSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsoleSink.class);
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
index 7fe406f..eeb1e87 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
@@ -41,7 +41,7 @@ import javax.annotation.Nullable;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-public class DorisSink implements FlinkStreamSink<Row, Row>, 
FlinkBatchSink<Row, Row> {
+public class DorisSink implements FlinkStreamSink, FlinkBatchSink {
 
     private static final long serialVersionUID = 4747849769146047770L;
     private static final int DEFAULT_BATCH_SIZE = 100;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
index 82c8f86..aa3746c 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidSink.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.types.Row;
 
-public class DruidSink implements FlinkBatchSink<Row, Row> {
+public class DruidSink implements FlinkBatchSink {
 
     private static final long serialVersionUID = -2967782261362988646L;
     private static final String COORDINATOR_URL = "coordinator_url";
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
index b271ff0..afa5eb6 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidSource.java
@@ -52,7 +52,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 
-public class DruidSource implements FlinkBatchSource<Row> {
+public class DruidSource implements FlinkBatchSource {
 
     private static final long serialVersionUID = 8152628883440481281L;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DruidSource.class);
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
index be235c0..6965947 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
@@ -55,7 +55,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class Elasticsearch implements FlinkStreamSink<Row, Row>, 
FlinkBatchSink<Row, Row> {
+public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink {
 
     private static final long serialVersionUID = 8445868321245456793L;
     private static final int DEFAULT_CONFIG_SIZE = 3;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
index 69a2fff..b7915dd 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
@@ -35,7 +35,7 @@ import org.apache.flink.types.Row;
 
 import java.util.concurrent.TimeUnit;
 
-public class FakeSourceStream extends RichParallelSourceFunction<Row> 
implements FlinkStreamSource<Row> {
+public class FakeSourceStream extends RichParallelSourceFunction<Row> 
implements FlinkStreamSource {
 
     private static final long serialVersionUID = -3026082767246767679L;
     private volatile boolean running = true;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
index 4569343..defaef7 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.PrintStream;
 
-public class FileSink implements FlinkStreamSink<Row, Row>, 
FlinkBatchSink<Row, Row> {
+public class FileSink implements FlinkStreamSink, FlinkBatchSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FileSink.class);
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
index cd1e010..ed80e63 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
@@ -45,7 +45,7 @@ import org.apache.parquet.schema.MessageType;
 import java.util.List;
 import java.util.Map;
 
-public class FileSource implements FlinkBatchSource<Row> {
+public class FileSource implements FlinkBatchSource {
 
     private static final long serialVersionUID = -5206798549756998426L;
     private static final int DEFAULT_BATCH_SIZE = 1000;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
index 82209b4..8f81688 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/sink/InfluxDbSink.java
@@ -30,7 +30,7 @@ import org.apache.flink.types.Row;
 
 import java.util.List;
 
-public class InfluxDbSink implements FlinkBatchSink<Row, Row> {
+public class InfluxDbSink implements FlinkBatchSink {
 
     private static final long serialVersionUID = 7358988750295693096L;
     private static final String SERVER_URL = "server_url";
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
index a8cf18e..019e7e9 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
@@ -42,7 +42,7 @@ import org.apache.flink.types.Row;
 import java.util.HashMap;
 import java.util.List;
 
-public class InfluxDbSource implements FlinkBatchSource<Row> {
+public class InfluxDbSource implements FlinkBatchSource {
 
     private Config config;
     private InfluxDbInputFormat influxDbInputFormat;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
index 2a3e615..aaa41fe 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
@@ -52,7 +52,7 @@ import javax.annotation.Nullable;
 
 import java.util.Arrays;
 
-public class JdbcSink implements FlinkStreamSink<Row, Row>, 
FlinkBatchSink<Row, Row> {
+public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
 
     private static final long serialVersionUID = 3677571223952518115L;
     private static final int DEFAULT_BATCH_SIZE = 5000;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
index bfd1b7b..e028880 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
@@ -56,7 +56,7 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-public class JdbcSource implements FlinkBatchSource<Row> {
+public class JdbcSource implements FlinkBatchSource {
 
     private static final long serialVersionUID = -3349505356339446415L;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcSource.class);
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/sink/KafkaTable.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/sink/KafkaTable.java
index a8857ac..7e6bf74 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/sink/KafkaTable.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/sink/KafkaTable.java
@@ -42,7 +42,7 @@ import javax.annotation.Nullable;
 
 import java.util.Properties;
 
-public class KafkaTable implements FlinkStreamSink<Row, Row> {
+public class KafkaTable implements FlinkStreamSink {
 
     private static final long serialVersionUID = 3980751499724935230L;
     private Config config;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/source/KafkaTableStream.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/source/KafkaTableStream.java
index a2e2721..42e1fa5 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/source/KafkaTableStream.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/source/KafkaTableStream.java
@@ -46,7 +46,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Properties;
 
-public class KafkaTableStream implements FlinkStreamSource<Row> {
+public class KafkaTableStream implements FlinkStreamSource {
 
     private static final long   serialVersionUID = 5287018194573371428L;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaTableStream.class);
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
index ae37a3e..549bf8d 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.Row;
 
-public class SocketStream implements FlinkStreamSource<Row> {
+public class SocketStream implements FlinkStreamSource {
 
     private static final long serialVersionUID = 986629276153771291L;
     private Config config;
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
index d112515..261e4dd 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.command.FlinkCommandArgs;
 import org.apache.seatunnel.config.ConfigBuilder;
 import org.apache.seatunnel.config.PluginType;
 import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.env.RuntimeEnv;
 
 import java.util.List;
 
@@ -37,9 +38,9 @@ public class FlinkTaskExecuteCommand extends 
BaseTaskExecuteCommand<FlinkCommand
     public void execute(FlinkCommandArgs flinkCommandArgs) {
         ConfigBuilder configBuilder = new 
ConfigBuilder(flinkCommandArgs.getConfigFile(), 
flinkCommandArgs.getEngineType());
 
-        List<BaseSource> sources = 
configBuilder.createPlugins(PluginType.SOURCE);
-        List<BaseTransform> transforms = 
configBuilder.createPlugins(PluginType.TRANSFORM);
-        List<BaseSink> sinks = configBuilder.createPlugins(PluginType.SINK);
+        List<BaseSource<RuntimeEnv>> sources = 
configBuilder.createPlugins(PluginType.SOURCE);
+        List<BaseTransform<RuntimeEnv>> transforms = 
configBuilder.createPlugins(PluginType.TRANSFORM);
+        List<BaseSink<RuntimeEnv>> sinks = 
configBuilder.createPlugins(PluginType.SINK);
         Execution execution = configBuilder.createExecution();
         baseCheckConfig(sources, transforms, sinks);
         prepare(configBuilder.getEnv(), sources, transforms, sinks);
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
index abf3a3e..44e0b53 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.config;
 
+import org.apache.seatunnel.apis.BaseSink;
+import org.apache.seatunnel.apis.BaseSource;
+import org.apache.seatunnel.apis.BaseTransform;
 import org.apache.seatunnel.common.config.ConfigRuntimeException;
 import org.apache.seatunnel.env.Execution;
 import org.apache.seatunnel.env.RuntimeEnv;
@@ -175,7 +178,7 @@ public class ConfigBuilder {
         this.createPlugins(PluginType.SINK);
     }
 
-    public <T extends Plugin<?>> List<T> createPlugins(PluginType type) {
+    public <T extends Plugin<? extends RuntimeEnv>> List<T> 
createPlugins(PluginType type) {
         Objects.requireNonNull(type, "PluginType can not be null when create 
plugins!");
         List<T> basePluginList = new ArrayList<>();
         List<? extends Config> configList = 
config.getConfigList(type.getType());
@@ -212,8 +215,14 @@ public class ConfigBuilder {
         return env;
     }
 
-    public Execution createExecution() {
-        Execution execution = null;
+    public Execution<
+        ? extends BaseSource<? extends RuntimeEnv>,
+        ? extends BaseTransform<? extends RuntimeEnv>,
+        ? extends BaseSink<? extends RuntimeEnv>> createExecution() {
+        Execution<
+            ? extends BaseSource<? extends RuntimeEnv>,
+            ? extends BaseTransform<? extends RuntimeEnv>,
+            ? extends BaseSink<? extends RuntimeEnv>> execution = null;
         switch (engine) {
             case SPARK:
                 SparkEnvironment sparkEnvironment = (SparkEnvironment) env;
diff --git 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-datastream2table/src/main/java/org/apache/seatunnel/flink/transform/DataStreamToTable.java
 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-datastream2table/src/main/java/org/apache/seatunnel/flink/transform/DataStreamToTable.java
index 852ba27..7ce02aa 100644
--- 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-datastream2table/src/main/java/org/apache/seatunnel/flink/transform/DataStreamToTable.java
+++ 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-datastream2table/src/main/java/org/apache/seatunnel/flink/transform/DataStreamToTable.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 
-public class DataStreamToTable implements FlinkStreamTransform<Row, Row>, 
FlinkBatchTransform<Row, Row> {
+public class DataStreamToTable implements FlinkStreamTransform, 
FlinkBatchTransform {
 
     private static final long serialVersionUID = -7861928245025199286L;
     private Config config;
diff --git 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
index 1b498b5..2292967 100644
--- 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
+++ 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
@@ -34,7 +34,7 @@ import org.apache.flink.types.Row;
 
 import java.util.List;
 
-public class Split implements FlinkStreamTransform<Row, Row>, 
FlinkBatchTransform<Row, Row> {
+public class Split implements FlinkStreamTransform, FlinkBatchTransform {
 
     private Config config;
 
diff --git 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
index 6364a23..4594fd44 100644
--- 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
+++ 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 
-public class Sql implements FlinkStreamTransform<Row, Row>, 
FlinkBatchTransform<Row, Row> {
+public class Sql implements FlinkStreamTransform, FlinkBatchTransform {
 
     private String sql;
 
diff --git 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-table2datastream/src/main/java/org/apache/seatunnel/flink/transform/TableToDataStream.java
 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-table2datastream/src/main/java/org/apache/seatunnel/flink/transform/TableToDataStream.java
index 140745d..a504a38 100644
--- 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-table2datastream/src/main/java/org/apache/seatunnel/flink/transform/TableToDataStream.java
+++ 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-table2datastream/src/main/java/org/apache/seatunnel/flink/transform/TableToDataStream.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 
-public class TableToDataStream implements FlinkStreamTransform<Row, Row>, 
FlinkBatchTransform<Row, Row> {
+public class TableToDataStream implements FlinkStreamTransform, 
FlinkBatchTransform {
 
     private static final long serialVersionUID = 4556842426965038124L;
     private Config config;

Reply via email to