This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 625db5f  [Improvement][core] Optimize generic type (#1489)
625db5f is described below

commit 625db5f17b5efe35453d403aeb8812e4b731208e
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Mar 16 14:59:43 2022 +0800

    [Improvement][core] Optimize generic type (#1489)
    
    * Optimize generic type
    
    * fix code style
    
    * Remove RuntimeEnv implement with Plugin
---
 .../java/org/apache/seatunnel/env/Execution.java   |  7 +++--
 .../java/org/apache/seatunnel/env/RuntimeEnv.java  | 14 +++++++--
 .../java/org/apache/seatunnel/plugin/Plugin.java   |  3 +-
 .../org/apache/seatunnel/flink/BaseFlinkSink.java  |  1 +
 .../apache/seatunnel/flink/FlinkEnvironment.java   | 12 ++++++--
 .../seatunnel/flink/batch/FlinkBatchExecution.java | 14 ++++-----
 .../flink/stream/FlinkStreamExecution.java         |  4 +--
 .../apache/seatunnel/spark/BaseSparkSource.java    |  4 +--
 .../apache/seatunnel/spark/SparkEnvironment.java   |  6 ++--
 .../seatunnel/spark/batch/SparkBatchExecution.java |  4 +--
 .../StructuredStreamingExecution.java              |  4 +--
 .../spark/stream/SparkStreamingExecution.scala     |  6 ++--
 .../seatunnel/flink/source/FileSourceTest.java     |  8 ++---
 .../org/apache/seatunnel/spark/source/Fake.scala   |  2 +-
 .../org/apache/seatunnel/spark/sink/File.scala     |  9 +++---
 .../org/apache/seatunnel/spark/source/File.scala   |  6 ++--
 .../org/apache/seatunnel/spark/sink/Hudi.scala     |  4 +--
 .../org/apache/seatunnel/spark/source/Hudi.scala   |  4 +--
 .../org/apache/seatunnel/spark/sink/Kudu.scala     |  3 +-
 .../seatunnel/command/BaseTaskExecuteCommand.java  | 18 ++++++-----
 .../command/flink/FlinkConfValidateCommand.java    |  3 +-
 .../command/flink/FlinkTaskExecuteCommand.java     | 17 ++++++-----
 .../command/spark/SparkConfValidateCommand.java    |  3 +-
 .../command/spark/SparkTaskExecuteCommand.java     | 15 ++++++----
 .../org/apache/seatunnel/config/ConfigBuilder.java | 35 +++++++++-------------
 25 files changed, 115 insertions(+), 91 deletions(-)

diff --git 
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
 
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
index b6f4167..192903c 100644
--- 
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
+++ 
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
@@ -28,9 +28,10 @@ import java.util.List;
  * the SeaTunnel job's execution context
  */
 public interface Execution<
-    SR extends BaseSource<? extends RuntimeEnv>,
-    TF extends BaseTransform<? extends RuntimeEnv>,
-    SK extends BaseSink<? extends RuntimeEnv>> extends Plugin<Void> {
+    SR extends BaseSource<RE>,
+    TF extends BaseTransform<RE>,
+    SK extends BaseSink<RE>,
+    RE extends RuntimeEnv> extends Plugin<RE> {
 
     /**
      * start to execute the SeaTunnel job
diff --git 
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
 
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
index f582c8d..5c8ea3c 100644
--- 
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
+++ 
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
@@ -17,11 +17,21 @@
 
 package org.apache.seatunnel.env;
 
-import org.apache.seatunnel.plugin.Plugin;
+import org.apache.seatunnel.common.config.CheckResult;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 /**
  * engine related runtime environment
  */
-public interface RuntimeEnv extends Plugin<Boolean> {
+public interface RuntimeEnv {
+
+    RuntimeEnv setConfig(Config config);
+
+    Config getConfig();
+
+    CheckResult checkConfig();
+
+    RuntimeEnv prepare();
 
 }
diff --git 
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
 
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
index c2bba47..5f1bc17 100644
--- 
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
+++ 
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.plugin;
 
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.env.RuntimeEnv;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -26,7 +27,7 @@ import java.io.Serializable;
 /**
  * a base interface indicates belonging to SeaTunnel.
  */
-public interface Plugin<T> extends Serializable {
+public interface Plugin<T extends RuntimeEnv> extends Serializable {
     String RESULT_TABLE_NAME = "result_table_name";
     String SOURCE_TABLE_NAME = "source_table_name";
 
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java
index 2ab8bac..0daa07b 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java
@@ -23,4 +23,5 @@ import org.apache.seatunnel.apis.BaseSink;
  * a base interface indicates a sink plugin running on Flink.
  */
 public interface BaseFlinkSink extends BaseSink<FlinkEnvironment> {
+
 }
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index 188fc0f..5875436 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -60,8 +60,9 @@ public class FlinkEnvironment implements RuntimeEnv {
     private String jobName = "seatunnel";
 
     @Override
-    public void setConfig(Config config) {
+    public FlinkEnvironment setConfig(Config config) {
         this.config = config;
+        return this;
     }
 
     @Override
@@ -75,8 +76,7 @@ public class FlinkEnvironment implements RuntimeEnv {
     }
 
     @Override
-    public void prepare(Boolean isStreaming) {
-        this.isStreaming = isStreaming;
+    public FlinkEnvironment prepare() {
         if (isStreaming) {
             createStreamEnvironment();
             createStreamTableEnvironment();
@@ -87,6 +87,7 @@ public class FlinkEnvironment implements RuntimeEnv {
         if (config.hasPath("job.name")) {
             jobName = config.getString("job.name");
         }
+        return this;
     }
 
     public String getJobName() {
@@ -97,6 +98,11 @@ public class FlinkEnvironment implements RuntimeEnv {
         return isStreaming;
     }
 
+    public FlinkEnvironment setStreaming(boolean isStreaming) {
+        this.isStreaming = isStreaming;
+        return this;
+    }
+
     public StreamExecutionEnvironment getStreamExecutionEnvironment() {
         return environment;
     }
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index 017f5bd..c81a8cc 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -36,7 +36,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
-public class FlinkBatchExecution implements Execution<FlinkBatchSource, 
FlinkBatchTransform, FlinkBatchSink> {
+public class FlinkBatchExecution implements Execution<FlinkBatchSource, 
FlinkBatchTransform, FlinkBatchSink, FlinkEnvironment> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkBatchExecution.class);
 
@@ -82,13 +82,13 @@ public class FlinkBatchExecution implements 
Execution<FlinkBatchSource, FlinkBat
         }
     }
 
-    private void registerResultTable(Config sourceConfig, DataSet<Row> 
dataSet) {
-        if (sourceConfig.hasPath(RESULT_TABLE_NAME)) {
-            String name = sourceConfig.getString(RESULT_TABLE_NAME);
+    private void registerResultTable(Config pluginConfig, DataSet<Row> 
dataSet) {
+        if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
+            String name = pluginConfig.getString(RESULT_TABLE_NAME);
             BatchTableEnvironment tableEnvironment = 
flinkEnvironment.getBatchTableEnvironment();
             if (!TableUtil.tableExists(tableEnvironment, name)) {
-                if (sourceConfig.hasPath("field_name")) {
-                    String fieldName = sourceConfig.getString("field_name");
+                if (pluginConfig.hasPath("field_name")) {
+                    String fieldName = pluginConfig.getString("field_name");
                     tableEnvironment.registerDataSet(name, dataSet, fieldName);
                 } else {
                     tableEnvironment.registerDataSet(name, dataSet);
@@ -122,6 +122,6 @@ public class FlinkBatchExecution implements 
Execution<FlinkBatchSource, FlinkBat
     }
 
     @Override
-    public void prepare(Void prepareEnv) {
+    public void prepare(FlinkEnvironment prepareEnv) {
     }
 }
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index 753050e..2c5f124 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -36,7 +36,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
-public class FlinkStreamExecution implements Execution<FlinkStreamSource, 
FlinkStreamTransform, FlinkStreamSink> {
+public class FlinkStreamExecution implements Execution<FlinkStreamSource, 
FlinkStreamTransform, FlinkStreamSink, FlinkEnvironment> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkStreamExecution.class);
 
@@ -121,6 +121,6 @@ public class FlinkStreamExecution implements 
Execution<FlinkStreamSource, FlinkS
     }
 
     @Override
-    public void prepare(Void prepareEnv) {
+    public void prepare(FlinkEnvironment prepareEnv) {
     }
 }
diff --git 
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
 
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
index 391a791..47ac1e5 100644
--- 
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
+++ 
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
@@ -25,7 +25,7 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 /**
  * a base interface indicates a source plugin running on Spark.
  */
-public abstract class BaseSparkSource<T> implements 
BaseSource<SparkEnvironment> {
+public abstract class BaseSparkSource<OUT> implements 
BaseSource<SparkEnvironment> {
 
     protected Config config = ConfigFactory.empty();
 
@@ -39,5 +39,5 @@ public abstract class BaseSparkSource<T> implements 
BaseSource<SparkEnvironment>
         return this.config;
     }
 
-    public abstract T getData(SparkEnvironment env);
+    public abstract OUT getData(SparkEnvironment env);
 }
diff --git 
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
 
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index cad0af0..f0b638b 100644
--- 
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++ 
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -46,8 +46,9 @@ public class SparkEnvironment implements RuntimeEnv {
     }
 
     @Override
-    public void setConfig(Config config) {
+    public SparkEnvironment setConfig(Config config) {
         this.config = config;
+        return this;
     }
 
     @Override
@@ -61,7 +62,7 @@ public class SparkEnvironment implements RuntimeEnv {
     }
 
     @Override
-    public void prepare(Boolean prepareEnv) {
+    public SparkEnvironment prepare() {
         SparkConf sparkConf = createSparkConf();
         SparkSession.Builder builder = 
SparkSession.builder().config(sparkConf);
         if (enableHive) {
@@ -69,6 +70,7 @@ public class SparkEnvironment implements RuntimeEnv {
         }
         this.sparkSession = builder.getOrCreate();
         createStreamingContext();
+        return this;
     }
 
     public SparkSession getSparkSession() {
diff --git 
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
 
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
index 6a81220..5674c28 100644
--- 
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
+++ 
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
@@ -33,7 +33,7 @@ import org.apache.spark.sql.Row;
 
 import java.util.List;
 
-public class SparkBatchExecution implements Execution<SparkBatchSource, 
BaseSparkTransform, SparkBatchSink> {
+public class SparkBatchExecution implements Execution<SparkBatchSource, 
BaseSparkTransform, SparkBatchSink, SparkEnvironment> {
 
     private final SparkEnvironment environment;
 
@@ -123,7 +123,7 @@ public class SparkBatchExecution implements 
Execution<SparkBatchSource, BaseSpar
     }
 
     @Override
-    public void prepare(Void prepareEnv) {
+    public void prepare(SparkEnvironment prepareEnv) {
 
     }
 }
diff --git 
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
 
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
index 7e08175..8545b2a 100644
--- 
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
+++ 
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
@@ -27,7 +27,7 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import java.util.List;
 
-public class StructuredStreamingExecution implements 
Execution<StructuredStreamingSource, BaseSparkTransform, 
StructuredStreamingSink> {
+public class StructuredStreamingExecution implements 
Execution<StructuredStreamingSource, BaseSparkTransform, 
StructuredStreamingSink, SparkEnvironment> {
 
     private final SparkEnvironment sparkEnvironment;
 
@@ -58,7 +58,7 @@ public class StructuredStreamingExecution implements 
Execution<StructuredStreami
     }
 
     @Override
-    public void prepare(Void prepareEnv) {
+    public void prepare(SparkEnvironment prepareEnv) {
 
     }
 }
diff --git 
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
 
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
index 107ff27..2ffaf15 100644
--- 
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
+++ 
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
@@ -17,9 +17,9 @@
 package org.apache.seatunnel.spark.stream
 
 import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
 import org.apache.seatunnel.env.Execution
 import org.apache.seatunnel.plugin.Plugin
+import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
 import org.apache.seatunnel.spark.batch.SparkBatchExecution
 import org.apache.seatunnel.spark.{BaseSparkSink, BaseSparkSource, 
BaseSparkTransform, SparkEnvironment}
 import org.apache.spark.sql.{Dataset, Row}
@@ -28,7 +28,7 @@ import java.util.{List => JList}
 import scala.collection.JavaConversions._
 
 class SparkStreamingExecution(sparkEnvironment: SparkEnvironment)
-  extends Execution[BaseSparkSource[_], BaseSparkTransform, BaseSparkSink[_]] {
+  extends Execution[BaseSparkSource[_], BaseSparkTransform, BaseSparkSink[_], 
SparkEnvironment] {
 
   private var config = ConfigFactory.empty()
 
@@ -79,5 +79,5 @@ class SparkStreamingExecution(sparkEnvironment: 
SparkEnvironment)
 
   override def checkConfig(): CheckResult = CheckResult.success()
 
-  override def prepare(void: Void): Unit = {}
+  override def prepare(void: SparkEnvironment): Unit = {}
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
index 3a9a8f7..b7913d6 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
@@ -61,10 +61,10 @@ public class FileSourceTest {
     private FlinkEnvironment createFlinkStreamEnvironment(String configFile) {
         Config rootConfig = getRootConfig(configFile);
 
-        FlinkEnvironment flinkEnvironment = new FlinkEnvironment();
-        flinkEnvironment.setConfig(rootConfig);
-        flinkEnvironment.prepare(false);
-        return flinkEnvironment;
+        return new FlinkEnvironment()
+            .setStreaming(false)
+            .setConfig(rootConfig)
+            .prepare();
     }
 
     private FileSource createFileSource(String configFile, FlinkEnvironment 
flinkEnvironment) {
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
index c186ded..8d7f461 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
@@ -19,9 +19,9 @@ package org.apache.seatunnel.spark.source
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSource
-import org.apache.spark.sql.{Dataset, Row, RowFactory}
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.types.{DataTypes, StructType}
+import org.apache.spark.sql.{Dataset, Row, RowFactory}
 
 class Fake extends SparkBatchSource {
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
index 8fa0c4f..1ed1b7e 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
@@ -16,11 +16,6 @@
  */
 package org.apache.seatunnel.spark.sink
 
-import java.util
-
-import scala.collection.JavaConversions._
-import scala.util.{Failure, Success, Try}
-
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import 
org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfigThrowable
@@ -31,6 +26,10 @@ import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.spark.sql.{Dataset, Row}
 
+import java.util
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+
 class File extends SparkBatchSink {
 
   override def checkConfig(): CheckResult = {
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
index e68f816..7ebb999 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
@@ -16,9 +16,6 @@
  */
 package org.apache.seatunnel.spark.source
 
-import scala.collection.JavaConversions._
-import scala.util.{Failure, Success, Try}
-
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import 
org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfigThrowable
@@ -27,6 +24,9 @@ import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSource
 import org.apache.spark.sql.{Dataset, Row}
 
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+
 class File extends SparkBatchSource {
 
   override def prepare(env: SparkEnvironment): Unit = {}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/sink/Hudi.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/sink/Hudi.scala
index 1ec4eb7..7e98bc3 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/sink/Hudi.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/sink/Hudi.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.seatunnel.spark.sink
 
-import scala.collection.JavaConversions._
-
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
@@ -25,6 +23,8 @@ import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.spark.sql.{Dataset, Row}
 
+import scala.collection.JavaConversions._
+
 class Hudi extends SparkBatchSink {
 
   override def checkConfig(): CheckResult = {
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/source/Hudi.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/source/Hudi.scala
index ceeb49b..d4562ea 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/source/Hudi.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/source/Hudi.scala
@@ -17,13 +17,13 @@
 package org.apache.seatunnel.spark.source
 
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
-
-import scala.collection.JavaConversions._
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSource
 import org.apache.spark.sql.{Dataset, Row}
 
+import scala.collection.JavaConversions._
+
 class Hudi extends SparkBatchSource {
 
   override def prepare(env: SparkEnvironment): Unit = {}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
index 2ce7bba..152676a 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.spark.sink
 
-import scala.collection.JavaConversions._
 import org.apache.kudu.spark.kudu._
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
@@ -26,6 +25,8 @@ import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.spark.sql.{Dataset, Row}
 
+import scala.collection.JavaConversions._
+
 class Kudu extends SparkBatchSink {
 
   override def prepare(env: SparkEnvironment): Unit = {
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
index ff00e4b..c9bf805 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
@@ -45,7 +45,7 @@ import java.util.Optional;
  *
  * @param <T> command args.
  */
-public abstract class BaseTaskExecuteCommand<T extends CommandArgs> implements 
Command<T> {
+public abstract class BaseTaskExecuteCommand<T extends CommandArgs, E extends 
RuntimeEnv> implements Command<T> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseTaskExecuteCommand.class);
 
@@ -54,7 +54,8 @@ public abstract class BaseTaskExecuteCommand<T extends 
CommandArgs> implements C
      *
      * @param plugins plugin list.
      */
-    protected void baseCheckConfig(List<? extends Plugin>... plugins) {
+    @SafeVarargs
+    protected final void baseCheckConfig(List<? extends Plugin<E>>... plugins) 
{
         pluginCheck(plugins);
         deployModeCheck();
     }
@@ -62,11 +63,12 @@ public abstract class BaseTaskExecuteCommand<T extends 
CommandArgs> implements C
     /**
      * Execute prepare method defined in {@link Plugin}.
      *
-     * @param env runtimeEnv
+     * @param env     runtimeEnv
      * @param plugins plugin list
      */
-    protected void prepare(RuntimeEnv env, List<? extends Plugin>... plugins) {
-        for (List<? extends Plugin> pluginList : plugins) {
+    @SafeVarargs
+    protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
+        for (List<? extends Plugin<E>> pluginList : plugins) {
             pluginList.forEach(plugin -> plugin.prepare(env));
         }
     }
@@ -86,9 +88,9 @@ public abstract class BaseTaskExecuteCommand<T extends 
CommandArgs> implements C
      *
      * @param plugins plugin list
      */
-    private void pluginCheck(List<? extends Plugin>... plugins) {
-        for (List<? extends Plugin> pluginList : plugins) {
-            for (Plugin plugin : pluginList) {
+    private void pluginCheck(List<? extends Plugin<E>>... plugins) {
+        for (List<? extends Plugin<E>> pluginList : plugins) {
+            for (Plugin<E> plugin : pluginList) {
                 CheckResult checkResult;
                 try {
                     checkResult = plugin.checkConfig();
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
index 02086b2..c05b4ce 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.command.flink;
 import org.apache.seatunnel.command.Command;
 import org.apache.seatunnel.command.FlinkCommandArgs;
 import org.apache.seatunnel.config.ConfigBuilder;
+import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,7 +35,7 @@ public class FlinkConfValidateCommand implements 
Command<FlinkCommandArgs> {
     @Override
     public void execute(FlinkCommandArgs flinkCommandArgs) {
         String configPath = flinkCommandArgs.getConfigFile();
-        new ConfigBuilder(configPath, 
flinkCommandArgs.getEngineType()).checkConfig();
+        new ConfigBuilder<FlinkEnvironment>(configPath, 
flinkCommandArgs.getEngineType()).checkConfig();
         LOGGER.info("config OK !");
     }
 }
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
index 261e4dd..f3f7a06 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
@@ -25,23 +25,26 @@ import org.apache.seatunnel.command.FlinkCommandArgs;
 import org.apache.seatunnel.config.ConfigBuilder;
 import org.apache.seatunnel.config.PluginType;
 import org.apache.seatunnel.env.Execution;
-import org.apache.seatunnel.env.RuntimeEnv;
+import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import java.util.List;
 
 /**
  * Used to execute Flink Job.
  */
-public class FlinkTaskExecuteCommand extends 
BaseTaskExecuteCommand<FlinkCommandArgs> {
+public class FlinkTaskExecuteCommand extends 
BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
 
     @Override
     public void execute(FlinkCommandArgs flinkCommandArgs) {
-        ConfigBuilder configBuilder = new 
ConfigBuilder(flinkCommandArgs.getConfigFile(), 
flinkCommandArgs.getEngineType());
+        ConfigBuilder<FlinkEnvironment> configBuilder = new 
ConfigBuilder<>(flinkCommandArgs.getConfigFile(), 
flinkCommandArgs.getEngineType());
+
+        List<BaseSource<FlinkEnvironment>> sources = 
configBuilder.createPlugins(PluginType.SOURCE);
+        List<BaseTransform<FlinkEnvironment>> transforms = 
configBuilder.createPlugins(PluginType.TRANSFORM);
+        List<BaseSink<FlinkEnvironment>> sinks = 
configBuilder.createPlugins(PluginType.SINK);
+
+        Execution<BaseSource<FlinkEnvironment>, 
BaseTransform<FlinkEnvironment>, BaseSink<FlinkEnvironment>, FlinkEnvironment>
+            execution = configBuilder.createExecution();
 
-        List<BaseSource<RuntimeEnv>> sources = 
configBuilder.createPlugins(PluginType.SOURCE);
-        List<BaseTransform<RuntimeEnv>> transforms = 
configBuilder.createPlugins(PluginType.TRANSFORM);
-        List<BaseSink<RuntimeEnv>> sinks = 
configBuilder.createPlugins(PluginType.SINK);
-        Execution execution = configBuilder.createExecution();
         baseCheckConfig(sources, transforms, sinks);
         prepare(configBuilder.getEnv(), sources, transforms, sinks);
         showAsciiLogo();
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
index bfa9f78..6aa3f94 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.command.Command;
 import org.apache.seatunnel.command.SparkCommandArgs;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.config.ConfigBuilder;
+import org.apache.seatunnel.spark.SparkEnvironment;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +43,7 @@ public class SparkConfValidateCommand implements 
Command<SparkCommandArgs> {
         } else {
             confPath = sparkCommandArgs.getConfigFile();
         }
-        new ConfigBuilder(confPath, 
sparkCommandArgs.getEngineType()).checkConfig();
+        new ConfigBuilder<SparkEnvironment>(confPath, 
sparkCommandArgs.getEngineType()).checkConfig();
         LOGGER.info("config OK !");
     }
 
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
index f155839..912023f 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
@@ -25,20 +25,23 @@ import org.apache.seatunnel.command.SparkCommandArgs;
 import org.apache.seatunnel.config.ConfigBuilder;
 import org.apache.seatunnel.config.PluginType;
 import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.spark.SparkEnvironment;
 
 import java.util.List;
 
-public class SparkTaskExecuteCommand extends 
BaseTaskExecuteCommand<SparkCommandArgs> {
+public class SparkTaskExecuteCommand extends 
BaseTaskExecuteCommand<SparkCommandArgs, SparkEnvironment> {
 
     @Override
     public void execute(SparkCommandArgs sparkCommandArgs) {
         String confFile = sparkCommandArgs.getConfigFile();
 
-        ConfigBuilder configBuilder = new ConfigBuilder(confFile, 
sparkCommandArgs.getEngineType());
-        List<BaseSource> sources = 
configBuilder.createPlugins(PluginType.SOURCE);
-        List<BaseTransform> transforms = 
configBuilder.createPlugins(PluginType.TRANSFORM);
-        List<BaseSink> sinks = configBuilder.createPlugins(PluginType.SINK);
-        Execution execution = configBuilder.createExecution();
+        ConfigBuilder<SparkEnvironment> configBuilder = new 
ConfigBuilder<>(confFile, sparkCommandArgs.getEngineType());
+        List<BaseSource<SparkEnvironment>> sources = 
configBuilder.createPlugins(PluginType.SOURCE);
+        List<BaseTransform<SparkEnvironment>> transforms = 
configBuilder.createPlugins(PluginType.TRANSFORM);
+        List<BaseSink<SparkEnvironment>> sinks = 
configBuilder.createPlugins(PluginType.SINK);
+
+        Execution<BaseSource<SparkEnvironment>, 
BaseTransform<SparkEnvironment>, BaseSink<SparkEnvironment>, SparkEnvironment>
+            execution = configBuilder.createExecution();
         baseCheckConfig(sources, transforms, sinks);
         prepare(configBuilder.getEnv(), sources, transforms, sinks);
         showAsciiLogo();
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
index 44e0b53..cad5440 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
@@ -47,19 +47,19 @@ import java.util.Objects;
 import java.util.ServiceConfigurationError;
 import java.util.ServiceLoader;
 
-public class ConfigBuilder {
+public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigBuilder.class);
 
     private static final String PLUGIN_NAME_KEY = "plugin_name";
     private final String configFile;
     private final EngineType engine;
-    private ConfigPackage configPackage;
+    private final ConfigPackage configPackage;
     private final Config config;
     private boolean streaming;
     private Config envConfig;
     private boolean enableHive;
-    private final RuntimeEnv env;
+    private final ENVIRONMENT env;
 
     public ConfigBuilder(String configFile, EngineType engine) {
         this.configFile = configFile;
@@ -94,7 +94,7 @@ public class ConfigBuilder {
         return envConfig;
     }
 
-    public RuntimeEnv getEnv() {
+    public ENVIRONMENT getEnv() {
         return env;
     }
 
@@ -123,7 +123,7 @@ public class ConfigBuilder {
     /**
      * create plugin class instance, ignore case.
      **/
-    private <T extends Plugin<?>> T createPluginInstanceIgnoreCase(String 
name, PluginType pluginType) throws Exception {
+    private <T extends Plugin<ENVIRONMENT>> T 
createPluginInstanceIgnoreCase(String name, PluginType pluginType) throws 
Exception {
         if (name.split("\\.").length != 1) {
             // canonical class name
             return (T) Class.forName(name).newInstance();
@@ -178,7 +178,7 @@ public class ConfigBuilder {
         this.createPlugins(PluginType.SINK);
     }
 
-    public <T extends Plugin<? extends RuntimeEnv>> List<T> 
createPlugins(PluginType type) {
+    public <T extends Plugin<ENVIRONMENT>> List<T> createPlugins(PluginType 
type) {
         Objects.requireNonNull(type, "PluginType can not be null when create 
plugins!");
         List<T> basePluginList = new ArrayList<>();
         List<? extends Config> configList = 
config.getConfigList(type.getType());
@@ -195,34 +195,27 @@ public class ConfigBuilder {
         return basePluginList;
     }
 
-    private RuntimeEnv createEnv() {
+    private ENVIRONMENT createEnv() {
         envConfig = config.getConfig("env");
         streaming = checkIsStreaming();
         enableHive = checkIsContainHive();
-        RuntimeEnv env = null;
+        ENVIRONMENT env;
         switch (engine) {
             case SPARK:
-                env = new SparkEnvironment().setEnableHive(enableHive);
+                env = (ENVIRONMENT) new 
SparkEnvironment().setEnableHive(enableHive);
                 break;
             case FLINK:
-                env = new FlinkEnvironment();
+                env = (ENVIRONMENT) new 
FlinkEnvironment().setStreaming(streaming);
                 break;
             default:
                 throw new IllegalArgumentException("Engine: " + engine + " is 
not supported");
         }
-        env.setConfig(envConfig);
-        env.prepare(streaming);
+        env.setConfig(envConfig).prepare();
         return env;
     }
 
-    public Execution<
-        ? extends BaseSource<? extends RuntimeEnv>,
-        ? extends BaseTransform<? extends RuntimeEnv>,
-        ? extends BaseSink<? extends RuntimeEnv>> createExecution() {
-        Execution<
-            ? extends BaseSource<? extends RuntimeEnv>,
-            ? extends BaseTransform<? extends RuntimeEnv>,
-            ? extends BaseSink<? extends RuntimeEnv>> execution = null;
+    public Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>, 
BaseSink<ENVIRONMENT>, ENVIRONMENT> createExecution() {
+        Execution execution = null;
         switch (engine) {
             case SPARK:
                 SparkEnvironment sparkEnvironment = (SparkEnvironment) env;
@@ -243,7 +236,7 @@ public class ConfigBuilder {
             default:
                 break;
         }
-        return execution;
+        return (Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>, 
BaseSink<ENVIRONMENT>, ENVIRONMENT>) execution;
     }
 
 }

Reply via email to