This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 99189beda [INLONG-4764][Sort] Import sort end2end unit test with group file input (#5627) 99189beda is described below commit 99189bedacca354e5244be251235a32ad8ccf1e2 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Tue Aug 23 14:19:44 2022 +0800 [INLONG-4764][Sort] Import sort end2end unit test with group file input (#5627) Co-authored-by: thesumery <158971...@qq.com> --- .../org/apache/inlong/sort/tests/KafkaE2ECase.java | 175 ++++--- .../sort/tests/utils/FlinkContainerTestEnv.java | 38 +- .../sort/tests/utils/PlaceholderResolver.java | 150 ++++++ .../apache/inlong/sort/tests/utils/TestUtils.java | 16 + .../test/resources/env/kafka_test_kafka_init.txt | 1 + .../test/resources/env/kafka_test_mysql_init.txt | 19 + .../src/test/resources/groupFile/kafka_test.json | 562 +++++++++++++++++++++ 7 files changed, 894 insertions(+), 67 deletions(-) diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java index 2f9b2128f..2f82b7ac2 100644 --- a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java +++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java @@ -20,9 +20,9 @@ package org.apache.inlong.sort.tests; import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; import org.apache.inlong.sort.tests.utils.JdbcProxy; +import org.apache.inlong.sort.tests.utils.PlaceholderResolver; import org.apache.inlong.sort.tests.utils.TestUtils; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; import org.junit.ClassRule; import org.junit.Test; import org.slf4j.Logger; @@ -34,6 +34,8 @@ import org.testcontainers.utility.DockerImageName; import java.io.IOException; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.sql.Connection; @@ -41,9 +43,10 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * End-to-end tests for sort-connector-kafka uber jar. @@ -56,17 +59,6 @@ public class KafkaE2ECase extends FlinkContainerTestEnv { private static final Path mysqlJar = TestUtils.getResource("sort-connector-mysql-cdc.jar"); private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar"); // Can't use getResource("xxx").getPath(), windows will don't know that path - private static final String sqlFile; - - static { - try { - sqlFile = Paths.get(KafkaE2ECase.class.getResource("/flinkSql/kafka_test.sql").toURI()).toString(); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } - - private static final String TOPIC = "test-topic"; @ClassRule public static final KafkaContainer KAFKA = @@ -76,33 +68,47 @@ public class KafkaE2ECase extends FlinkContainerTestEnv { .withEmbeddedZookeeper() .withLogConsumer(new Slf4jLogConsumer(LOG)); - @Before - public void setup() { - initializeMysqlTable(); - initializeKafkaTable(); - } - - @After - public void teardown() { + @AfterClass + public static void teardown() { if (KAFKA != null) { KAFKA.stop(); } } - private void initializeKafkaTable() { - List<String> commands = new ArrayList<>(); - commands.add("kafka-topics"); - commands.add("--create"); - commands.add("--topic"); - commands.add(TOPIC); - commands.add("--replication-factor 1"); - commands.add("--partitions 1"); - commands.add("--zookeeper"); - commands.add("localhost:" + KafkaContainer.ZOOKEEPER_PORT); + private Path getSql(String fileName, Map<String, Object> properties) { + try { + Path file = Paths.get(KafkaE2ECase.class.getResource("/flinkSql/" + fileName).toURI()); + return PlaceholderResolver.getDefaultResolver().resolveByMap(file, properties); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private Path getGroupFile(String fileName, Map<String, Object> properties) { try { - LOG.info(String.join(" ", commands)); - ExecResult result = KAFKA.execInContainer("bash", "-c", String.join(" ", commands)); - LOG.info(result.getStdout()); + Path file = Paths.get(KafkaE2ECase.class.getResource("/groupFile/" + fileName).toURI()); + return PlaceholderResolver.getDefaultResolver().resolveByMap(file, properties); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private String getCreateStatement(String fileName, Map<String, Object> properties) { + try { + Path file = Paths.get(KafkaE2ECase.class.getResource("/env/" + fileName).toURI()); + return PlaceholderResolver.getDefaultResolver().resolveByMap( + new String(Files.readAllBytes(file), StandardCharsets.UTF_8), + properties); + } catch (IOException | URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private void initializeKafkaTable(String fileName, Map<String, Object> properties) { + try { + String createKafkaStatement = getCreateStatement(fileName, properties); + ExecResult result = KAFKA.execInContainer("bash", "-c", createKafkaStatement); + LOG.info("Create kafka topic: {}, std: {}", createKafkaStatement, result.getStdout()); if (result.getExitCode() != 0) { throw new RuntimeException("Init kafka topic failed. Exit code:" + result.getExitCode()); } @@ -111,43 +117,40 @@ public class KafkaE2ECase extends FlinkContainerTestEnv { } } - private void initializeMysqlTable() { + private void initializeMysqlTable(String fileName, Map<String, Object> properties) { try (Connection conn = DriverManager.getConnection(MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword()); Statement stat = conn.createStatement()) { - stat.execute( - "CREATE TABLE test_input (\n" - + " id INTEGER NOT NULL,\n" - + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" - + " description VARCHAR(512),\n" - + " weight FLOAT,\n" - + " enum_c enum('red', 'white') default 'red', -- test some complex types as well,\n" - + " json_c JSON, -- because we use additional dependencies to deserialize complex types.\n" - + " point_c POINT\n" - + ");"); - stat.execute( - "CREATE TABLE test_output (\n" - + " id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n" - + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" - + " description VARCHAR(512),\n" - + " weight FLOAT,\n" - + " enum_c VARCHAR(255),\n" - + " json_c VARCHAR(255),\n" - + " point_c VARCHAR(255)\n" - + ");"); + String createMysqlStatement = getCreateStatement(fileName, properties); + stat.execute(createMysqlStatement); } catch (SQLException e) { throw new RuntimeException(e); } } - /** * Test flink sql mysql cdc to hive * * @throws Exception The exception may throws when execute the case */ @Test - public void testKafka() throws Exception { + public void testKafkaWithSqlFile() throws Exception { + final String topic = "test-topic"; + final String mysqlInputTable = "test_input"; + final String mysqlOutputTable = "test_output"; + initializeMysqlTable("kafka_test_mysql_init.txt", new HashMap() { + { + put("MYSQL_INPUT_TABLE", mysqlInputTable); + put("MYSQL_OUTPUT_TABLE", mysqlOutputTable); + } + }); + initializeKafkaTable("kafka_test_kafka_init.txt", new HashMap() { + { + put("TOPIC", topic); + put("ZOOKEEPER_PORT", KafkaContainer.ZOOKEEPER_PORT); + } + }); + String sqlFile = getSql("kafka_test.sql", new HashMap<>()).toString(); submitSQLJob(sqlFile, kafkaJar, jdbcJar, mysqlJar, mysqlJdbcJar); waitUntilJobRunning(Duration.ofSeconds(30)); @@ -179,4 +182,60 @@ public class KafkaE2ECase extends FlinkContainerTestEnv { 60000L); } + @Test + public void testKafkaWithGroupFile() throws Exception { + final String topic = "test_topic_for_group_file"; + final String mysqlInputTable = "test_input_for_group_file"; + final String mysqlOutputTable = "test_output_for_group_file"; + initializeMysqlTable("kafka_test_mysql_init.txt", new HashMap() { + { + put("MYSQL_INPUT_TABLE", mysqlInputTable); + put("MYSQL_OUTPUT_TABLE", mysqlOutputTable); + } + }); + initializeKafkaTable("kafka_test_kafka_init.txt", new HashMap() { + { + put("TOPIC", topic); + put("ZOOKEEPER_PORT", KafkaContainer.ZOOKEEPER_PORT); + } + }); + String groupFile = getGroupFile("kafka_test.json", new HashMap() { + { + put("MYSQL_INPUT_TABLE", mysqlInputTable); + put("MYSQL_OUTPUT_TABLE", mysqlOutputTable); + put("TOPIC", topic); + put("ZOOKEEPER_PORT", KafkaContainer.ZOOKEEPER_PORT); + } + }).toString(); + submitGroupFileJob(groupFile, kafkaJar, jdbcJar, mysqlJar, mysqlJdbcJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + + // generate input + try (Connection conn = + DriverManager.getConnection(MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword()); + Statement stat = conn.createStatement()) { + stat.execute( + "INSERT INTO test_input_for_group_file " + + "VALUES (1,'jacket','water resistent white wind breaker',0.2, null, null, null);"); + stat.execute( + "INSERT INTO test_input_for_group_file " + + "VALUES (2,'scooter','Big 2-wheel scooter ',5.18, null, null, null);"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + // validate output + JdbcProxy proxy = + new JdbcProxy(MYSQL.getJdbcUrl(), MYSQL.getUsername(), MYSQL.getPassword(), MYSQL_DRIVER_CLASS); + List<String> expectResult = + Arrays.asList( + "1,jacket,water resistent white wind breaker,0.2,null,null,null", + "2,scooter,Big 2-wheel scooter ,5.18,null,null,null"); + proxy.checkResultWithTimeout( + expectResult, + mysqlOutputTable, + 7, + 60000L); + } } diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java index b7eba6c4c..914ac7285 100644 --- a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java +++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java @@ -29,8 +29,8 @@ import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.table.api.ValidationException; import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.rules.TemporaryFolder; @@ -102,10 +102,10 @@ public abstract class FlinkContainerTestEnv extends TestLogger { public final TemporaryFolder temporaryFolder = new TemporaryFolder(); @Nullable - private RestClusterClient<StandaloneClusterId> restClusterClient; + private static RestClusterClient<StandaloneClusterId> restClusterClient; - private GenericContainer<?> jobManager; - private GenericContainer<?> taskManager; + private static GenericContainer<?> jobManager; + private static GenericContainer<?> taskManager; // ---------------------------------------------------------------------------------------- @@ -121,12 +121,13 @@ public abstract class FlinkContainerTestEnv extends TestLogger { .withDatabaseName("test") .withUsername("flinkuser") .withPassword("flinkpw") + .withUrlParam("allowMultiQueries", "true") .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS) .withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG)); - @Before - public void before() { + @BeforeClass + public static void before() { LOG.info("Starting containers..."); jobManager = new GenericContainer<>("flink:1.13.5-scala_2.11") @@ -151,8 +152,8 @@ public abstract class FlinkContainerTestEnv extends TestLogger { LOG.info("Containers are started."); } - @After - public void after() { + @AfterClass + public static void after() { if (restClusterClient != null) { restClusterClient.close(); } @@ -191,6 +192,25 @@ public abstract class FlinkContainerTestEnv extends TestLogger { } } + public void submitGroupFileJob(String groupFile, Path... jars) + throws IOException, InterruptedException { + final List<String> commands = new ArrayList<>(); + String containerGroupFile = copyToContainerTmpPath(jobManager, groupFile); + commands.add(FLINK_BIN + "/flink run -d"); + commands.add("-c org.apache.inlong.sort.Entrance"); + commands.add(copyToContainerTmpPath(jobManager, constructDistJar(jars))); + commands.add("--group.info.file"); + commands.add(containerGroupFile); + + ExecResult execResult = + jobManager.execInContainer("bash", "-c", String.join(" ", commands)); + LOG.info(execResult.getStdout()); + LOG.error(execResult.getStderr()); + if (execResult.getExitCode() != 0) { + throw new AssertionError("Failed when submitting the SQL job."); + } + } + /** * Get {@link RestClusterClient} connected to this FlinkContainer. * diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java new file mode 100644 index 000000000..277124fe0 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * A file placeholder replacement tool. + */ +public class PlaceholderResolver { + /** + * Default placeholder prefix + */ + public static final String DEFAULT_PLACEHOLDER_PREFIX = "${"; + + /** + * Default placeholder suffix + */ + public static final String DEFAULT_PLACEHOLDER_SUFFIX = "}"; + + /** + * Default singleton resolver + */ + private static PlaceholderResolver defaultResolver = new PlaceholderResolver(); + + /** + * Placeholder prefix + */ + private String placeholderPrefix = DEFAULT_PLACEHOLDER_PREFIX; + + /** + * Placeholder suffix + */ + private String placeholderSuffix = DEFAULT_PLACEHOLDER_SUFFIX; + + private PlaceholderResolver() { + + } + + private PlaceholderResolver(String placeholderPrefix, String placeholderSuffix) { + this.placeholderPrefix = placeholderPrefix; + this.placeholderSuffix = placeholderSuffix; + } + + public static PlaceholderResolver getDefaultResolver() { + return defaultResolver; + } + + public static PlaceholderResolver getResolver(String placeholderPrefix, String placeholderSuffix) { + return new PlaceholderResolver(placeholderPrefix, placeholderSuffix); + } + + /** + * Replace template string with special placeholder according to replace function. + * @param content template string with special placeholder + * @param rule placeholder replacement rule + * @return new replaced string + */ + public String resolveByRule(String content, Function<String, String> rule) { + int start = content.indexOf(this.placeholderPrefix); + if (start == -1) { + return content; + } + StringBuilder result = new StringBuilder(content); + while (start != -1) { + int end = result.indexOf(this.placeholderSuffix, start); + // get placeholder actual value (e.g. ${id}, get the value represent id) + String placeholder = result.substring(start + this.placeholderPrefix.length(), end); + // replace placeholder value + String replaceContent = placeholder.trim().isEmpty() ? "" : rule.apply(placeholder); + result.replace(start, end + this.placeholderSuffix.length(), replaceContent); + start = result.indexOf(this.placeholderPrefix, start + replaceContent.length()); + } + return result.toString(); + } + + /** + * Replace template string with special placeholder according to replace function. + * @param file template file with special placeholder + * @param rule placeholder replacement rule + * @return new replaced string + */ + public Path resolveByRule(Path file, Function<String, String> rule) { + try { + List<String> newContents = Files.readAllLines(file, StandardCharsets.UTF_8) + .stream() + .map(content -> resolveByRule(content, rule)) + .collect(Collectors.toList()); + Path newPath = Paths.get(file.getParent().toString(), file.getFileName() + "$"); + Files.write(newPath, String.join(System.lineSeparator(), newContents).getBytes(StandardCharsets.UTF_8)); + return newPath; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Replace template string with special placeholder according to properties file. + * Key is the content of the placeholder <br/><br/> + * e.g: content = product:${id}:detail:${did}<br/> + * valueMap = id -> 1; pid -> 2<br/> + * return: product:1:detail:2<br/> + * + * @param content template string with special placeholder + * @param valueMap placeholder replacement map + * @return new replaced string + */ + public String resolveByMap(String content, final Map<String, Object> valueMap) { + return resolveByRule(content, placeholderValue -> String.valueOf(valueMap.get(placeholderValue))); + } + + /** + * Replace template string with special placeholder according to properties file. + * Key is the content of the placeholder <br/><br/> + * e.g: content = product:${id}:detail:${did}<br/> + * valueMap = id -> 1; pid -> 2<br/> + * return: product:1:detail:2<br/> + * + * @param file template string with special placeholder + * @param valueMap placeholder replacement map + * @return new replaced string + */ + public Path resolveByMap(Path file, final Map<String, Object> valueMap) { + return resolveByRule(file, placeholderValue -> String.valueOf(valueMap.get(placeholderValue))); + } +} diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java index beeb5edd9..a73d7afec 100644 --- a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java +++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java @@ -18,17 +18,23 @@ package org.apache.inlong.sort.tests.utils; +import org.junit.Test; + import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.junit.Assert.assertEquals; + /** * Test util for test container. */ @@ -106,4 +112,14 @@ public class TestUtils { return value == null ? defaultValue : converter.apply(value); } } + + @Test + public void testReplaceholder() { + String before = "today is ${date}, today weather is ${weather}"; + Map<String, Object> maps = new HashMap<>(); + maps.put("date", "2022.08.05"); + maps.put("weather", "rain"); + String after = PlaceholderResolver.getDefaultResolver().resolveByMap(before, maps); + assertEquals(after, "today is 2022.08.05, today weather is rain"); + } } diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_kafka_init.txt b/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_kafka_init.txt new file mode 100644 index 000000000..b2f31d78f --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_kafka_init.txt @@ -0,0 +1 @@ +kafka-topics --create --topic ${TOPIC} --replication-factor 1 --partitions 1 --zookeeper localhost:${ZOOKEEPER_PORT} \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_mysql_init.txt b/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_mysql_init.txt new file mode 100644 index 000000000..c7b1948a1 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_mysql_init.txt @@ -0,0 +1,19 @@ +CREATE TABLE ${MYSQL_INPUT_TABLE} ( + id INTEGER NOT NULL, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT, + enum_c enum('red', 'white') default 'red', -- test some complex types as well, + json_c JSON, -- because we use additional dependencies to deserialize complex types. + point_c POINT +); + +CREATE TABLE ${MYSQL_OUTPUT_TABLE} ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT, + enum_c VARCHAR(255), + json_c VARCHAR(255), + point_c VARCHAR(255) +); \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/groupFile/kafka_test.json b/inlong-sort/sort-end-to-end-tests/src/test/resources/groupFile/kafka_test.json new file mode 100644 index 000000000..54931a186 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/src/test/resources/groupFile/kafka_test.json @@ -0,0 +1,562 @@ +{ + "groupId": "t323a2b0f-614e-4b04-82d4-3880427bc3e8", + "streams": [ + { + "streamId": "1", + "nodes": [ + { + "type": "mysqlExtract", + "id": "1", + "name": "mysql_input", + "fields": [ + { + "type": "field", + "name": "id", + "formatInfo": { + "type": "int" + } + }, + { + "type": "field", + "name": "name", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "description", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "weight", + "formatInfo": { + "type": "decimal", + "precision": 10, + "scale": 3 + } + }, + { + "type": "field", + "name": "enum_c", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "json_c", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "point_c", + "formatInfo": { + "type": "string" + } + } + ], + "primaryKey": "id", + "tableNames": [ + "${MYSQL_INPUT_TABLE}" + ], + "hostname": "mysql", + "username": "inlong", + "password": "inlong", + "database": "test", + "port": 3306, + "incrementalSnapshotEnabled": false, + "serverTimeZone": "Asia/Shanghai" + }, + { + "type": "kafkaLoad", + "id": "2", + "name": "kafka_load", + "fields": [ + { + "type": "field", + "name": "id", + "formatInfo": { + "type": "int" + } + }, + { + "type": "field", + "name": "name", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "description", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "weight", + "formatInfo": { + "type": "decimal", + "precision": 10, + "scale": 3 + } + }, + { + "type": "field", + "name": "enum_c", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "json_c", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "point_c", + "formatInfo": { + "type": "string" + } + } + ], + "fieldRelations": [ + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "id", + "formatInfo": { + "type": "int" + } + }, + "outputField": { + "type": "field", + "name": "id", + "formatInfo": { + "type": "int" + } + } + }, + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "name", + "formatInfo": { + "type": "string" + } + }, + "outputField": { + "type": "field", + "name": "name", + "formatInfo": { + "type": "string" + } + } + }, + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "description", + "formatInfo": { + "type": "string" + } + }, + "outputField": { + "type": "field", + "name": "description", + "formatInfo": { + "type": "string" + } + } + }, + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "weight", + "formatInfo": { + "type": "decimal", + "precision": 10, + "scale": 3 + } + }, + "outputField": { + "type": "field", + "name": "weight", + "formatInfo": { + "type": "decimal", + "precision": 10, + "scale": 3 + } + } + }, + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "enum_c", + "formatInfo": { + "type": "string" + } + }, + "outputField": { + "type": "field", + "name": "enum_c", + "formatInfo": { + "type": "string" + } + } + }, + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "json_c", + "formatInfo": { + "type": "string" + } + }, + "outputField": { + "type": "field", + "name": "json_c", + "formatInfo": { + "type": "string" + } + } + }, + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "point_c", + "formatInfo": { + "type": "string" + } + }, + "outputField": { + "type": "field", + "name": "point_c", + "formatInfo": { + "type": "string" + } + } + } + ], + "primaryKey": "id", + "topic": "${TOPIC}", + "bootstrapServers": "kafka:9092", + "format": { + "type": "canalJsonFormat", + "ignoreParseErrors": true, + "timestampFormatStandard": "SQL", + "mapNullKeyMode": "DROP", + "mapNullKeyLiteral": "null", + "encodeDecimalAsPlainNumber": true + }, + "sinkParallelism": 1, + "properties": {} + } + ], + "relations": [ + { + "type": "baseRelation", + "inputs": [ + "1" + ], + "outputs": [ + "2" + ] + } + ] + }, + { + "streamId": "2", + "nodes": [ + { + "type": "kafkaExtract", + "id": "3", + "name": "kafka_extract", + "fields": [ + { + "type": "field", + "name": "id", + "formatInfo": { + "type": "int" + } + }, + { + "type": "field", + "name": "name", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "description", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "weight", + "formatInfo": { + "type": "decimal", + "precision": 10, + "scale": 3 + } + }, + { + "type": "field", + "name": "enum_c", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "json_c", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "point_c", + "formatInfo": { + "type": "string" + } + } + ], + "primaryKey": "id", + "topic": "${TOPIC}", + "bootstrapServers": "kafka:9092", + "format": { + "type": "canalJsonFormat", + "ignoreParseErrors": true, + "timestampFormatStandard": "SQL", + "mapNullKeyMode": "DROP", + "mapNullKeyLiteral": "null", + "encodeDecimalAsPlainNumber": true + }, + "scanStartupMode": "EARLIEST_OFFSET", + "groupId": null, + "scanSpecificOffsets": null + }, + { + "type": "mysqlLoad", + "id": "4", + "name": "mysql_output", + "fields": [ + { + "type": "field", + "name": "id", + "formatInfo": { + "type": "int" + } + }, + { + "type": "field", + "name": "name", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "description", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "weight", + "formatInfo": { + "type": "decimal", + "precision": 10, + "scale": 3 + } + }, + { + "type": "field", + "name": "enum_c", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "json_c", + "formatInfo": { + "type": "string" + } + }, + { + "type": "field", + "name": "point_c", + "formatInfo": { + "type": "string" + } + } + ], + "fieldRelations": [ + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "id", + "formatInfo": { + "type": "int" + } + }, + "outputField": { + "type": "field", + "name": "id", + "formatInfo": { + "type": "int" + } + } + }, + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "name", + "formatInfo": { + "type": "string" + } + }, + "outputField": { + "type": "field", + "name": "name", + "formatInfo": { + "type": "string" + } + } + }, + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "description", + "formatInfo": { + "type": "string" + } + }, + "outputField": { + "type": "field", + "name": "description", + "formatInfo": { + "type": "string" + } + } + }, + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "weight", + "formatInfo": { + "type": "decimal", + "precision": 10, + "scale": 3 + } + }, + "outputField": { + "type": "field", + "name": "weight", + "formatInfo": { + "type": "decimal", + "precision": 10, + "scale": 3 + } + } + }, + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "enum_c", + "formatInfo": { + "type": "string" + } + }, + "outputField": { + "type": "field", + "name": "enum_c", + "formatInfo": { + "type": "string" + } + } + }, + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "json_c", + "formatInfo": { + "type": "string" + } + }, + "outputField": { + "type": "field", + "name": "json_c", + "formatInfo": { + "type": "string" + } + } + }, + { + "type": "fieldRelation", + "inputField": { + "type": "field", + "name": "point_c", + "formatInfo": { + "type": "string" + } + }, + "outputField": { + "type": "field", + "name": "point_c", + "formatInfo": { + "type": "string" + } + } + } + ], + "url": "jdbc:mysql://mysql:3306/test", + "username": "inlong", + "password": "inlong", + "tableName": "${MYSQL_OUTPUT_TABLE}", + "primaryKey": "id" + } + ], + "relations": [ + { + "type": "baseRelation", + "inputs": [ + "3" + ], + "outputs": [ + "4" + ] + } + ] + } + ] +} \ No newline at end of file