This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3cfc23595c [BUG][Zeta] Multiple sink actions of the same type have the
same name (#5499)
3cfc23595c is described below
commit 3cfc23595c0c7aa312a63d97b8ec11d2521dfae3
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Mon Sep 18 16:44:13 2023 +0800
[BUG][Zeta] Multiple sink actions of the same type have the same name
(#5499)
---
.../client/MultipleTableJobConfigParserTest.java | 16 ++++
.../resources/batch_fakesource_to_two_file.conf | 89 ++++++++++++++++++++++
.../engine/core/parse/JobConfigParser.java | 7 +-
.../core/parse/MultipleTableJobConfigParser.java | 2 +-
4 files changed, 112 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index e5faaea25c..d869713550 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -87,4 +87,20 @@ public class MultipleTableJobConfigParserTest {
Assertions.assertEquals(3,
actions.get(0).getUpstream().get(1).getParallelism());
Assertions.assertEquals(3, actions.get(0).getParallelism());
}
+
+ @Test
+ public void testMultipleSinkName() {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath =
TestUtils.getResource("/batch_fakesource_to_two_file.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setJobContext(new JobContext());
+ MultipleTableJobConfigParser jobConfigParser =
+ new MultipleTableJobConfigParser(filePath, new IdGenerator(),
jobConfig);
+ ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
+ List<Action> actions = parse.getLeft();
+ Assertions.assertEquals(2, actions.size());
+
+ Assertions.assertEquals("Sink[0]-LocalFile-default-identifier",
actions.get(0).getName());
+ Assertions.assertEquals("Sink[1]-LocalFile-default-identifier",
actions.get(1).getName());
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
new file mode 100644
index 0000000000..7ff3c21f78
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ execution.checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 3
+ }
+
+ FakeSource {
+ result_table_name = "fake2"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 3
+ }
+}
+
+transform {
+}
+
+sink {
+ LocalFile {
+ path = "/tmp/hive/warehouse/test2"
+ field_delimiter = "\t"
+ row_delimiter = "\n"
+ partition_by = ["age"]
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "text"
+ sink_columns = ["name", "age"]
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ save_mode = "error",
+ source_table_name = ["fake", "fake2"]
+ }
+
+ LocalFile {
+ path = "/tmp/hive/warehouse/test2"
+ field_delimiter = "\t"
+ row_delimiter = "\n"
+ partition_by = ["age"]
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "text"
+ sink_columns = ["name", "age"]
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ save_mode = "error",
+ source_table_name = ["fake"]
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 09bae74f5a..46c28b6397 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -130,6 +130,7 @@ public class JobConfigParser {
}
public List<SinkAction<?, ?, ?, ?>> parseSinks(
+ int configIndex,
List<List<Tuple2<CatalogTable, Action>>> inputVertices,
Config sinkConfig,
JobConfig jobConfig) {
@@ -145,6 +146,7 @@ public class JobConfigParser {
checkProducedTypeEquals(inputActions);
SinkAction<?, ?, ?, ?> sinkAction =
parseSink(
+ configIndex,
sinkConfig,
jobConfig,
spareParallelism,
@@ -164,6 +166,7 @@ public class JobConfigParser {
int parallelism = inputAction.getParallelism();
SinkAction<?, ?, ?, ?> sinkAction =
parseSink(
+ configIndex,
sinkConfig,
jobConfig,
parallelism,
@@ -176,6 +179,7 @@ public class JobConfigParser {
}
private SinkAction<?, ?, ?, ?> parseSink(
+ int configIndex,
Config config,
JobConfig jobConfig,
int parallelism,
@@ -198,7 +202,8 @@ public class JobConfigParser {
handleSaveMode(sink);
}
final String actionName =
- createSinkActionName(0, tuple.getLeft().getPluginName(),
getTableName(config));
+ createSinkActionName(
+ configIndex, tuple.getLeft().getPluginName(),
getTableName(config));
final SinkAction action =
new SinkAction<>(
idGenerator.getNextId(),
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index ee2505286f..c83ceade12 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -531,7 +531,7 @@ public class MultipleTableJobConfigParser {
factoryId,
(factory) -> factory.createSink(null));
if (fallback) {
- return fallbackParser.parseSinks(inputVertices, sinkConfig,
jobConfig);
+ return fallbackParser.parseSinks(configIndex, inputVertices,
sinkConfig, jobConfig);
}
Map<TablePath, CatalogTable> tableMap =