This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 625db5f [Improvement][core] Optimize generic type (#1489)
625db5f is described below
commit 625db5f17b5efe35453d403aeb8812e4b731208e
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Mar 16 14:59:43 2022 +0800
[Improvement][core] Optimize generic type (#1489)
* Optimize generic type
* fix code style
* Remove RuntimeEnv implement with Plugin
---
.../java/org/apache/seatunnel/env/Execution.java | 7 +++--
.../java/org/apache/seatunnel/env/RuntimeEnv.java | 14 +++++++--
.../java/org/apache/seatunnel/plugin/Plugin.java | 3 +-
.../org/apache/seatunnel/flink/BaseFlinkSink.java | 1 +
.../apache/seatunnel/flink/FlinkEnvironment.java | 12 ++++++--
.../seatunnel/flink/batch/FlinkBatchExecution.java | 14 ++++-----
.../flink/stream/FlinkStreamExecution.java | 4 +--
.../apache/seatunnel/spark/BaseSparkSource.java | 4 +--
.../apache/seatunnel/spark/SparkEnvironment.java | 6 ++--
.../seatunnel/spark/batch/SparkBatchExecution.java | 4 +--
.../StructuredStreamingExecution.java | 4 +--
.../spark/stream/SparkStreamingExecution.scala | 6 ++--
.../seatunnel/flink/source/FileSourceTest.java | 8 ++---
.../org/apache/seatunnel/spark/source/Fake.scala | 2 +-
.../org/apache/seatunnel/spark/sink/File.scala | 9 +++---
.../org/apache/seatunnel/spark/source/File.scala | 6 ++--
.../org/apache/seatunnel/spark/sink/Hudi.scala | 4 +--
.../org/apache/seatunnel/spark/source/Hudi.scala | 4 +--
.../org/apache/seatunnel/spark/sink/Kudu.scala | 3 +-
.../seatunnel/command/BaseTaskExecuteCommand.java | 18 ++++++-----
.../command/flink/FlinkConfValidateCommand.java | 3 +-
.../command/flink/FlinkTaskExecuteCommand.java | 17 ++++++-----
.../command/spark/SparkConfValidateCommand.java | 3 +-
.../command/spark/SparkTaskExecuteCommand.java | 15 ++++++----
.../org/apache/seatunnel/config/ConfigBuilder.java | 35 +++++++++-------------
25 files changed, 115 insertions(+), 91 deletions(-)
diff --git
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
index b6f4167..192903c 100644
---
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
+++
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
@@ -28,9 +28,10 @@ import java.util.List;
* the SeaTunnel job's execution context
*/
public interface Execution<
- SR extends BaseSource<? extends RuntimeEnv>,
- TF extends BaseTransform<? extends RuntimeEnv>,
- SK extends BaseSink<? extends RuntimeEnv>> extends Plugin<Void> {
+ SR extends BaseSource<RE>,
+ TF extends BaseTransform<RE>,
+ SK extends BaseSink<RE>,
+ RE extends RuntimeEnv> extends Plugin<RE> {
/**
* start to execute the SeaTunnel job
diff --git
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
index f582c8d..5c8ea3c 100644
---
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
+++
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
@@ -17,11 +17,21 @@
package org.apache.seatunnel.env;
-import org.apache.seatunnel.plugin.Plugin;
+import org.apache.seatunnel.common.config.CheckResult;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
/**
* engine related runtime environment
*/
-public interface RuntimeEnv extends Plugin<Boolean> {
+public interface RuntimeEnv {
+
+ RuntimeEnv setConfig(Config config);
+
+ Config getConfig();
+
+ CheckResult checkConfig();
+
+ RuntimeEnv prepare();
}
diff --git
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
index c2bba47..5f1bc17 100644
---
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
+++
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.plugin;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.env.RuntimeEnv;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -26,7 +27,7 @@ import java.io.Serializable;
/**
* a base interface indicates belonging to SeaTunnel.
*/
-public interface Plugin<T> extends Serializable {
+public interface Plugin<T extends RuntimeEnv> extends Serializable {
String RESULT_TABLE_NAME = "result_table_name";
String SOURCE_TABLE_NAME = "source_table_name";
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java
index 2ab8bac..0daa07b 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java
@@ -23,4 +23,5 @@ import org.apache.seatunnel.apis.BaseSink;
* a base interface indicates a sink plugin running on Flink.
*/
public interface BaseFlinkSink extends BaseSink<FlinkEnvironment> {
+
}
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index 188fc0f..5875436 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -60,8 +60,9 @@ public class FlinkEnvironment implements RuntimeEnv {
private String jobName = "seatunnel";
@Override
- public void setConfig(Config config) {
+ public FlinkEnvironment setConfig(Config config) {
this.config = config;
+ return this;
}
@Override
@@ -75,8 +76,7 @@ public class FlinkEnvironment implements RuntimeEnv {
}
@Override
- public void prepare(Boolean isStreaming) {
- this.isStreaming = isStreaming;
+ public FlinkEnvironment prepare() {
if (isStreaming) {
createStreamEnvironment();
createStreamTableEnvironment();
@@ -87,6 +87,7 @@ public class FlinkEnvironment implements RuntimeEnv {
if (config.hasPath("job.name")) {
jobName = config.getString("job.name");
}
+ return this;
}
public String getJobName() {
@@ -97,6 +98,11 @@ public class FlinkEnvironment implements RuntimeEnv {
return isStreaming;
}
+ public FlinkEnvironment setStreaming(boolean isStreaming) {
+ this.isStreaming = isStreaming;
+ return this;
+ }
+
public StreamExecutionEnvironment getStreamExecutionEnvironment() {
return environment;
}
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index 017f5bd..c81a8cc 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -36,7 +36,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-public class FlinkBatchExecution implements Execution<FlinkBatchSource,
FlinkBatchTransform, FlinkBatchSink> {
+public class FlinkBatchExecution implements Execution<FlinkBatchSource,
FlinkBatchTransform, FlinkBatchSink, FlinkEnvironment> {
private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkBatchExecution.class);
@@ -82,13 +82,13 @@ public class FlinkBatchExecution implements
Execution<FlinkBatchSource, FlinkBat
}
}
- private void registerResultTable(Config sourceConfig, DataSet<Row>
dataSet) {
- if (sourceConfig.hasPath(RESULT_TABLE_NAME)) {
- String name = sourceConfig.getString(RESULT_TABLE_NAME);
+ private void registerResultTable(Config pluginConfig, DataSet<Row>
dataSet) {
+ if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
+ String name = pluginConfig.getString(RESULT_TABLE_NAME);
BatchTableEnvironment tableEnvironment =
flinkEnvironment.getBatchTableEnvironment();
if (!TableUtil.tableExists(tableEnvironment, name)) {
- if (sourceConfig.hasPath("field_name")) {
- String fieldName = sourceConfig.getString("field_name");
+ if (pluginConfig.hasPath("field_name")) {
+ String fieldName = pluginConfig.getString("field_name");
tableEnvironment.registerDataSet(name, dataSet, fieldName);
} else {
tableEnvironment.registerDataSet(name, dataSet);
@@ -122,6 +122,6 @@ public class FlinkBatchExecution implements
Execution<FlinkBatchSource, FlinkBat
}
@Override
- public void prepare(Void prepareEnv) {
+ public void prepare(FlinkEnvironment prepareEnv) {
}
}
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index 753050e..2c5f124 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -36,7 +36,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-public class FlinkStreamExecution implements Execution<FlinkStreamSource,
FlinkStreamTransform, FlinkStreamSink> {
+public class FlinkStreamExecution implements Execution<FlinkStreamSource,
FlinkStreamTransform, FlinkStreamSink, FlinkEnvironment> {
private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkStreamExecution.class);
@@ -121,6 +121,6 @@ public class FlinkStreamExecution implements
Execution<FlinkStreamSource, FlinkS
}
@Override
- public void prepare(Void prepareEnv) {
+ public void prepare(FlinkEnvironment prepareEnv) {
}
}
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
index 391a791..47ac1e5 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
@@ -25,7 +25,7 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
/**
* a base interface indicates a source plugin running on Spark.
*/
-public abstract class BaseSparkSource<T> implements
BaseSource<SparkEnvironment> {
+public abstract class BaseSparkSource<OUT> implements
BaseSource<SparkEnvironment> {
protected Config config = ConfigFactory.empty();
@@ -39,5 +39,5 @@ public abstract class BaseSparkSource<T> implements
BaseSource<SparkEnvironment>
return this.config;
}
- public abstract T getData(SparkEnvironment env);
+ public abstract OUT getData(SparkEnvironment env);
}
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index cad0af0..f0b638b 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -46,8 +46,9 @@ public class SparkEnvironment implements RuntimeEnv {
}
@Override
- public void setConfig(Config config) {
+ public SparkEnvironment setConfig(Config config) {
this.config = config;
+ return this;
}
@Override
@@ -61,7 +62,7 @@ public class SparkEnvironment implements RuntimeEnv {
}
@Override
- public void prepare(Boolean prepareEnv) {
+ public SparkEnvironment prepare() {
SparkConf sparkConf = createSparkConf();
SparkSession.Builder builder =
SparkSession.builder().config(sparkConf);
if (enableHive) {
@@ -69,6 +70,7 @@ public class SparkEnvironment implements RuntimeEnv {
}
this.sparkSession = builder.getOrCreate();
createStreamingContext();
+ return this;
}
public SparkSession getSparkSession() {
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
index 6a81220..5674c28 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
@@ -33,7 +33,7 @@ import org.apache.spark.sql.Row;
import java.util.List;
-public class SparkBatchExecution implements Execution<SparkBatchSource,
BaseSparkTransform, SparkBatchSink> {
+public class SparkBatchExecution implements Execution<SparkBatchSource,
BaseSparkTransform, SparkBatchSink, SparkEnvironment> {
private final SparkEnvironment environment;
@@ -123,7 +123,7 @@ public class SparkBatchExecution implements
Execution<SparkBatchSource, BaseSpar
}
@Override
- public void prepare(Void prepareEnv) {
+ public void prepare(SparkEnvironment prepareEnv) {
}
}
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
index 7e08175..8545b2a 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
@@ -27,7 +27,7 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import java.util.List;
-public class StructuredStreamingExecution implements
Execution<StructuredStreamingSource, BaseSparkTransform,
StructuredStreamingSink> {
+public class StructuredStreamingExecution implements
Execution<StructuredStreamingSource, BaseSparkTransform,
StructuredStreamingSink, SparkEnvironment> {
private final SparkEnvironment sparkEnvironment;
@@ -58,7 +58,7 @@ public class StructuredStreamingExecution implements
Execution<StructuredStreami
}
@Override
- public void prepare(Void prepareEnv) {
+ public void prepare(SparkEnvironment prepareEnv) {
}
}
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
index 107ff27..2ffaf15 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
@@ -17,9 +17,9 @@
package org.apache.seatunnel.spark.stream
import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
import org.apache.seatunnel.env.Execution
import org.apache.seatunnel.plugin.Plugin
+import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
import org.apache.seatunnel.spark.batch.SparkBatchExecution
import org.apache.seatunnel.spark.{BaseSparkSink, BaseSparkSource,
BaseSparkTransform, SparkEnvironment}
import org.apache.spark.sql.{Dataset, Row}
@@ -28,7 +28,7 @@ import java.util.{List => JList}
import scala.collection.JavaConversions._
class SparkStreamingExecution(sparkEnvironment: SparkEnvironment)
- extends Execution[BaseSparkSource[_], BaseSparkTransform, BaseSparkSink[_]] {
+ extends Execution[BaseSparkSource[_], BaseSparkTransform, BaseSparkSink[_],
SparkEnvironment] {
private var config = ConfigFactory.empty()
@@ -79,5 +79,5 @@ class SparkStreamingExecution(sparkEnvironment:
SparkEnvironment)
override def checkConfig(): CheckResult = CheckResult.success()
- override def prepare(void: Void): Unit = {}
+ override def prepare(void: SparkEnvironment): Unit = {}
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
index 3a9a8f7..b7913d6 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
@@ -61,10 +61,10 @@ public class FileSourceTest {
private FlinkEnvironment createFlinkStreamEnvironment(String configFile) {
Config rootConfig = getRootConfig(configFile);
- FlinkEnvironment flinkEnvironment = new FlinkEnvironment();
- flinkEnvironment.setConfig(rootConfig);
- flinkEnvironment.prepare(false);
- return flinkEnvironment;
+ return new FlinkEnvironment()
+ .setStreaming(false)
+ .setConfig(rootConfig)
+ .prepare();
}
private FileSource createFileSource(String configFile, FlinkEnvironment
flinkEnvironment) {
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
index c186ded..8d7f461 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
@@ -19,9 +19,9 @@ package org.apache.seatunnel.spark.source
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
-import org.apache.spark.sql.{Dataset, Row, RowFactory}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, StructType}
+import org.apache.spark.sql.{Dataset, Row, RowFactory}
class Fake extends SparkBatchSource {
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
index 8fa0c4f..1ed1b7e 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/File.scala
@@ -16,11 +16,6 @@
*/
package org.apache.seatunnel.spark.sink
-import java.util
-
-import scala.collection.JavaConversions._
-import scala.util.{Failure, Success, Try}
-
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import
org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfigThrowable
@@ -31,6 +26,10 @@ import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
import org.apache.spark.sql.{Dataset, Row}
+import java.util
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+
class File extends SparkBatchSink {
override def checkConfig(): CheckResult = {
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
index e68f816..7ebb999 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
@@ -16,9 +16,6 @@
*/
package org.apache.seatunnel.spark.source
-import scala.collection.JavaConversions._
-import scala.util.{Failure, Success, Try}
-
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import
org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfigThrowable
@@ -27,6 +24,9 @@ import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
import org.apache.spark.sql.{Dataset, Row}
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+
class File extends SparkBatchSource {
override def prepare(env: SparkEnvironment): Unit = {}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/sink/Hudi.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/sink/Hudi.scala
index 1ec4eb7..7e98bc3 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/sink/Hudi.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/sink/Hudi.scala
@@ -16,8 +16,6 @@
*/
package org.apache.seatunnel.spark.sink
-import scala.collection.JavaConversions._
-
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
@@ -25,6 +23,8 @@ import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
import org.apache.spark.sql.{Dataset, Row}
+import scala.collection.JavaConversions._
+
class Hudi extends SparkBatchSink {
override def checkConfig(): CheckResult = {
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/source/Hudi.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/source/Hudi.scala
index ceeb49b..d4562ea 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/source/Hudi.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/source/Hudi.scala
@@ -17,13 +17,13 @@
package org.apache.seatunnel.spark.source
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
-
-import scala.collection.JavaConversions._
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
import org.apache.spark.sql.{Dataset, Row}
+import scala.collection.JavaConversions._
+
class Hudi extends SparkBatchSource {
override def prepare(env: SparkEnvironment): Unit = {}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
index 2ce7bba..152676a 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
@@ -17,7 +17,6 @@
package org.apache.seatunnel.spark.sink
-import scala.collection.JavaConversions._
import org.apache.kudu.spark.kudu._
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
@@ -26,6 +25,8 @@ import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
import org.apache.spark.sql.{Dataset, Row}
+import scala.collection.JavaConversions._
+
class Kudu extends SparkBatchSink {
override def prepare(env: SparkEnvironment): Unit = {
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
index ff00e4b..c9bf805 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
@@ -45,7 +45,7 @@ import java.util.Optional;
*
* @param <T> command args.
*/
-public abstract class BaseTaskExecuteCommand<T extends CommandArgs> implements
Command<T> {
+public abstract class BaseTaskExecuteCommand<T extends CommandArgs, E extends
RuntimeEnv> implements Command<T> {
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseTaskExecuteCommand.class);
@@ -54,7 +54,8 @@ public abstract class BaseTaskExecuteCommand<T extends
CommandArgs> implements C
*
* @param plugins plugin list.
*/
- protected void baseCheckConfig(List<? extends Plugin>... plugins) {
+ @SafeVarargs
+ protected final void baseCheckConfig(List<? extends Plugin<E>>... plugins)
{
pluginCheck(plugins);
deployModeCheck();
}
@@ -62,11 +63,12 @@ public abstract class BaseTaskExecuteCommand<T extends
CommandArgs> implements C
/**
* Execute prepare method defined in {@link Plugin}.
*
- * @param env runtimeEnv
+ * @param env runtimeEnv
* @param plugins plugin list
*/
- protected void prepare(RuntimeEnv env, List<? extends Plugin>... plugins) {
- for (List<? extends Plugin> pluginList : plugins) {
+ @SafeVarargs
+ protected final void prepare(E env, List<? extends Plugin<E>>... plugins) {
+ for (List<? extends Plugin<E>> pluginList : plugins) {
pluginList.forEach(plugin -> plugin.prepare(env));
}
}
@@ -86,9 +88,9 @@ public abstract class BaseTaskExecuteCommand<T extends
CommandArgs> implements C
*
* @param plugins plugin list
*/
- private void pluginCheck(List<? extends Plugin>... plugins) {
- for (List<? extends Plugin> pluginList : plugins) {
- for (Plugin plugin : pluginList) {
+ private void pluginCheck(List<? extends Plugin<E>>... plugins) {
+ for (List<? extends Plugin<E>> pluginList : plugins) {
+ for (Plugin<E> plugin : pluginList) {
CheckResult checkResult;
try {
checkResult = plugin.checkConfig();
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
index 02086b2..c05b4ce 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.command.flink;
import org.apache.seatunnel.command.Command;
import org.apache.seatunnel.command.FlinkCommandArgs;
import org.apache.seatunnel.config.ConfigBuilder;
+import org.apache.seatunnel.flink.FlinkEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +35,7 @@ public class FlinkConfValidateCommand implements
Command<FlinkCommandArgs> {
@Override
public void execute(FlinkCommandArgs flinkCommandArgs) {
String configPath = flinkCommandArgs.getConfigFile();
- new ConfigBuilder(configPath,
flinkCommandArgs.getEngineType()).checkConfig();
+ new ConfigBuilder<FlinkEnvironment>(configPath,
flinkCommandArgs.getEngineType()).checkConfig();
LOGGER.info("config OK !");
}
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
index 261e4dd..f3f7a06 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
@@ -25,23 +25,26 @@ import org.apache.seatunnel.command.FlinkCommandArgs;
import org.apache.seatunnel.config.ConfigBuilder;
import org.apache.seatunnel.config.PluginType;
import org.apache.seatunnel.env.Execution;
-import org.apache.seatunnel.env.RuntimeEnv;
+import org.apache.seatunnel.flink.FlinkEnvironment;
import java.util.List;
/**
* Used to execute Flink Job.
*/
-public class FlinkTaskExecuteCommand extends
BaseTaskExecuteCommand<FlinkCommandArgs> {
+public class FlinkTaskExecuteCommand extends
BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
@Override
public void execute(FlinkCommandArgs flinkCommandArgs) {
- ConfigBuilder configBuilder = new
ConfigBuilder(flinkCommandArgs.getConfigFile(),
flinkCommandArgs.getEngineType());
+ ConfigBuilder<FlinkEnvironment> configBuilder = new
ConfigBuilder<>(flinkCommandArgs.getConfigFile(),
flinkCommandArgs.getEngineType());
+
+ List<BaseSource<FlinkEnvironment>> sources =
configBuilder.createPlugins(PluginType.SOURCE);
+ List<BaseTransform<FlinkEnvironment>> transforms =
configBuilder.createPlugins(PluginType.TRANSFORM);
+ List<BaseSink<FlinkEnvironment>> sinks =
configBuilder.createPlugins(PluginType.SINK);
+
+ Execution<BaseSource<FlinkEnvironment>,
BaseTransform<FlinkEnvironment>, BaseSink<FlinkEnvironment>, FlinkEnvironment>
+ execution = configBuilder.createExecution();
- List<BaseSource<RuntimeEnv>> sources =
configBuilder.createPlugins(PluginType.SOURCE);
- List<BaseTransform<RuntimeEnv>> transforms =
configBuilder.createPlugins(PluginType.TRANSFORM);
- List<BaseSink<RuntimeEnv>> sinks =
configBuilder.createPlugins(PluginType.SINK);
- Execution execution = configBuilder.createExecution();
baseCheckConfig(sources, transforms, sinks);
prepare(configBuilder.getEnv(), sources, transforms, sinks);
showAsciiLogo();
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
index bfa9f78..6aa3f94 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.command.Command;
import org.apache.seatunnel.command.SparkCommandArgs;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.config.ConfigBuilder;
+import org.apache.seatunnel.spark.SparkEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +43,7 @@ public class SparkConfValidateCommand implements
Command<SparkCommandArgs> {
} else {
confPath = sparkCommandArgs.getConfigFile();
}
- new ConfigBuilder(confPath,
sparkCommandArgs.getEngineType()).checkConfig();
+ new ConfigBuilder<SparkEnvironment>(confPath,
sparkCommandArgs.getEngineType()).checkConfig();
LOGGER.info("config OK !");
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
index f155839..912023f 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
@@ -25,20 +25,23 @@ import org.apache.seatunnel.command.SparkCommandArgs;
import org.apache.seatunnel.config.ConfigBuilder;
import org.apache.seatunnel.config.PluginType;
import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.spark.SparkEnvironment;
import java.util.List;
-public class SparkTaskExecuteCommand extends
BaseTaskExecuteCommand<SparkCommandArgs> {
+public class SparkTaskExecuteCommand extends
BaseTaskExecuteCommand<SparkCommandArgs, SparkEnvironment> {
@Override
public void execute(SparkCommandArgs sparkCommandArgs) {
String confFile = sparkCommandArgs.getConfigFile();
- ConfigBuilder configBuilder = new ConfigBuilder(confFile,
sparkCommandArgs.getEngineType());
- List<BaseSource> sources =
configBuilder.createPlugins(PluginType.SOURCE);
- List<BaseTransform> transforms =
configBuilder.createPlugins(PluginType.TRANSFORM);
- List<BaseSink> sinks = configBuilder.createPlugins(PluginType.SINK);
- Execution execution = configBuilder.createExecution();
+ ConfigBuilder<SparkEnvironment> configBuilder = new
ConfigBuilder<>(confFile, sparkCommandArgs.getEngineType());
+ List<BaseSource<SparkEnvironment>> sources =
configBuilder.createPlugins(PluginType.SOURCE);
+ List<BaseTransform<SparkEnvironment>> transforms =
configBuilder.createPlugins(PluginType.TRANSFORM);
+ List<BaseSink<SparkEnvironment>> sinks =
configBuilder.createPlugins(PluginType.SINK);
+
+ Execution<BaseSource<SparkEnvironment>,
BaseTransform<SparkEnvironment>, BaseSink<SparkEnvironment>, SparkEnvironment>
+ execution = configBuilder.createExecution();
baseCheckConfig(sources, transforms, sinks);
prepare(configBuilder.getEnv(), sources, transforms, sinks);
showAsciiLogo();
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
index 44e0b53..cad5440 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
@@ -47,19 +47,19 @@ import java.util.Objects;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
-public class ConfigBuilder {
+public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigBuilder.class);
private static final String PLUGIN_NAME_KEY = "plugin_name";
private final String configFile;
private final EngineType engine;
- private ConfigPackage configPackage;
+ private final ConfigPackage configPackage;
private final Config config;
private boolean streaming;
private Config envConfig;
private boolean enableHive;
- private final RuntimeEnv env;
+ private final ENVIRONMENT env;
public ConfigBuilder(String configFile, EngineType engine) {
this.configFile = configFile;
@@ -94,7 +94,7 @@ public class ConfigBuilder {
return envConfig;
}
- public RuntimeEnv getEnv() {
+ public ENVIRONMENT getEnv() {
return env;
}
@@ -123,7 +123,7 @@ public class ConfigBuilder {
/**
* create plugin class instance, ignore case.
**/
- private <T extends Plugin<?>> T createPluginInstanceIgnoreCase(String
name, PluginType pluginType) throws Exception {
+ private <T extends Plugin<ENVIRONMENT>> T
createPluginInstanceIgnoreCase(String name, PluginType pluginType) throws
Exception {
if (name.split("\\.").length != 1) {
// canonical class name
return (T) Class.forName(name).newInstance();
@@ -178,7 +178,7 @@ public class ConfigBuilder {
this.createPlugins(PluginType.SINK);
}
- public <T extends Plugin<? extends RuntimeEnv>> List<T>
createPlugins(PluginType type) {
+ public <T extends Plugin<ENVIRONMENT>> List<T> createPlugins(PluginType
type) {
Objects.requireNonNull(type, "PluginType can not be null when create
plugins!");
List<T> basePluginList = new ArrayList<>();
List<? extends Config> configList =
config.getConfigList(type.getType());
@@ -195,34 +195,27 @@ public class ConfigBuilder {
return basePluginList;
}
- private RuntimeEnv createEnv() {
+ private ENVIRONMENT createEnv() {
envConfig = config.getConfig("env");
streaming = checkIsStreaming();
enableHive = checkIsContainHive();
- RuntimeEnv env = null;
+ ENVIRONMENT env;
switch (engine) {
case SPARK:
- env = new SparkEnvironment().setEnableHive(enableHive);
+ env = (ENVIRONMENT) new
SparkEnvironment().setEnableHive(enableHive);
break;
case FLINK:
- env = new FlinkEnvironment();
+ env = (ENVIRONMENT) new
FlinkEnvironment().setStreaming(streaming);
break;
default:
throw new IllegalArgumentException("Engine: " + engine + " is
not supported");
}
- env.setConfig(envConfig);
- env.prepare(streaming);
+ env.setConfig(envConfig).prepare();
return env;
}
- public Execution<
- ? extends BaseSource<? extends RuntimeEnv>,
- ? extends BaseTransform<? extends RuntimeEnv>,
- ? extends BaseSink<? extends RuntimeEnv>> createExecution() {
- Execution<
- ? extends BaseSource<? extends RuntimeEnv>,
- ? extends BaseTransform<? extends RuntimeEnv>,
- ? extends BaseSink<? extends RuntimeEnv>> execution = null;
+ public Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>,
BaseSink<ENVIRONMENT>, ENVIRONMENT> createExecution() {
+ Execution execution = null;
switch (engine) {
case SPARK:
SparkEnvironment sparkEnvironment = (SparkEnvironment) env;
@@ -243,7 +236,7 @@ public class ConfigBuilder {
default:
break;
}
- return execution;
+ return (Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>,
BaseSink<ENVIRONMENT>, ENVIRONMENT>) execution;
}
}