This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 2a2265a84c [INLONG-9695][Sort] Fix kafka extract node option config building error when use upsert-kafka connector (#9697) 2a2265a84c is described below commit 2a2265a84cb803d30e2e721d71733f11588dde99 Author: Xin Gong <genzhedangd...@gmail.com> AuthorDate: Tue Feb 20 10:22:22 2024 +0800 [INLONG-9695][Sort] Fix kafka extract node option config building error when use upsert-kafka connector (#9697) --- .../protocol/node/extract/KafkaExtractNode.java | 16 +++++----- .../inlong/sort/parser/KafkaSqlParseTest.java | 37 ++++++++++++++++++++++ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java index 200e8ee70c..f595f0f85d 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java @@ -118,7 +118,7 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad this.topic = Preconditions.checkNotNull(topic, "kafka topic is empty"); this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "kafka bootstrapServers is empty"); this.format = Preconditions.checkNotNull(format, "kafka format is empty"); - this.kafkaScanStartupMode = Preconditions.checkNotNull(kafkaScanStartupMode, "kafka scanStartupMode is empty"); + this.kafkaScanStartupMode = kafkaScanStartupMode; this.primaryKey = primaryKey; this.groupId = groupId; if (kafkaScanStartupMode == KafkaScanStartupMode.SPECIFIC_OFFSETS) { @@ -154,19 +154,19 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad Map<String, String> options = super.tableOptions(); options.put(KafkaConstant.TOPIC, topic); options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, bootstrapServers); - options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue()); if (isUpsertKafkaConnector(format, !StringUtils.isEmpty(this.primaryKey))) { options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA); options.putAll(format.generateOptions(true)); } else { options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA); options.putAll(format.generateOptions(false)); - } - if (StringUtils.isNotEmpty(scanSpecificOffsets)) { - options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets); - } - if (StringUtils.isNotBlank(scanTimestampMillis)) { - options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis); + options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue()); + if (StringUtils.isNotEmpty(scanSpecificOffsets)) { + options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets); + } + if (StringUtils.isNotBlank(scanTimestampMillis)) { + options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis); + } } if (StringUtils.isNotEmpty(groupId)) { options.put(KafkaConstant.PROPERTIES_GROUP_ID, groupId); diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java index 79991404af..6c91da6bba 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java @@ -126,6 +126,16 @@ public class KafkaSqlParseTest extends AbstractTestBase { null, "1665198979108"); } + private KafkaExtractNode buildUpsertKafkaExtract() { + List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("age", new IntFormatInfo())); + return new KafkaExtractNode("1", "upsert-kafka_input", fields, null, + null, "topic_input", "localhost:9092", + new JsonFormat(), null, "id", "groupId", + null, null); + } + private Node buildMysqlLoadNodeForRawFormat() { List<FieldInfo> fields = Arrays.asList(new FieldInfo("log", new StringFormatInfo())); List<FieldRelation> relations = Arrays @@ -162,4 +172,31 @@ public class KafkaSqlParseTest extends AbstractTestBase { ParseResult result = parser.parse(); Assert.assertTrue(result.tryExecute()); } + + /** + * Test flink sql task for extract is upsert-kafka {@link KafkaExtractNode} and load is mysql {@link MySqlLoadNode} + * + * @throws Exception The exception may be thrown when executing + */ + @Test + public void testUpsertKafkaExtractNodeSqlParse() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(10000); + env.disableOperatorChaining(); + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .inStreamingMode() + .build(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); + Node inputNode = buildUpsertKafkaExtract(); + Node outputNode = buildMysqlLoadNode(); + StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode), + Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode), + Collections.singletonList(outputNode)))); + GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo)); + FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo); + ParseResult result = parser.parse(); + Assert.assertTrue(result.tryExecute()); + } }