This is an automated email from the ASF dual-hosted git repository.

pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a2265a84c [INLONG-9695][Sort]  Fix kafka extract node option config 
building error when use upsert-kafka connector (#9697)
2a2265a84c is described below

commit 2a2265a84cb803d30e2e721d71733f11588dde99
Author: Xin Gong <genzhedangd...@gmail.com>
AuthorDate: Tue Feb 20 10:22:22 2024 +0800

    [INLONG-9695][Sort]  Fix kafka extract node option config building error 
when use upsert-kafka connector (#9697)
---
 .../protocol/node/extract/KafkaExtractNode.java    | 16 +++++-----
 .../inlong/sort/parser/KafkaSqlParseTest.java      | 37 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 8 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 200e8ee70c..f595f0f85d 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -118,7 +118,7 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
         this.topic = Preconditions.checkNotNull(topic, "kafka topic is empty");
         this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, 
"kafka bootstrapServers is empty");
         this.format = Preconditions.checkNotNull(format, "kafka format is 
empty");
-        this.kafkaScanStartupMode = 
Preconditions.checkNotNull(kafkaScanStartupMode, "kafka scanStartupMode is 
empty");
+        this.kafkaScanStartupMode = kafkaScanStartupMode;
         this.primaryKey = primaryKey;
         this.groupId = groupId;
         if (kafkaScanStartupMode == KafkaScanStartupMode.SPECIFIC_OFFSETS) {
@@ -154,19 +154,19 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
         Map<String, String> options = super.tableOptions();
         options.put(KafkaConstant.TOPIC, topic);
         options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, 
bootstrapServers);
-        options.put(KafkaConstant.SCAN_STARTUP_MODE, 
kafkaScanStartupMode.getValue());
         if (isUpsertKafkaConnector(format, 
!StringUtils.isEmpty(this.primaryKey))) {
             options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
             options.putAll(format.generateOptions(true));
         } else {
             options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
             options.putAll(format.generateOptions(false));
-        }
-        if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
-            options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, 
scanSpecificOffsets);
-        }
-        if (StringUtils.isNotBlank(scanTimestampMillis)) {
-            options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, 
scanTimestampMillis);
+            options.put(KafkaConstant.SCAN_STARTUP_MODE, 
kafkaScanStartupMode.getValue());
+            if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
+                options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, 
scanSpecificOffsets);
+            }
+            if (StringUtils.isNotBlank(scanTimestampMillis)) {
+                options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, 
scanTimestampMillis);
+            }
         }
         if (StringUtils.isNotEmpty(groupId)) {
             options.put(KafkaConstant.PROPERTIES_GROUP_ID, groupId);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
index 79991404af..6c91da6bba 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
@@ -126,6 +126,16 @@ public class KafkaSqlParseTest extends AbstractTestBase {
                 null, "1665198979108");
     }
 
+    private KafkaExtractNode buildUpsertKafkaExtract() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()));
+        return new KafkaExtractNode("1", "upsert-kafka_input", fields, null,
+                null, "topic_input", "localhost:9092",
+                new JsonFormat(), null, "id", "groupId",
+                null, null);
+    }
+
     private Node buildMysqlLoadNodeForRawFormat() {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("log", new 
StringFormatInfo()));
         List<FieldRelation> relations = Arrays
@@ -162,4 +172,31 @@ public class KafkaSqlParseTest extends AbstractTestBase {
         ParseResult result = parser.parse();
         Assert.assertTrue(result.tryExecute());
     }
+
+    /**
+     * Test flink sql task for extract is upsert-kafka {@link 
KafkaExtractNode} and load is mysql {@link MySqlLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testUpsertKafkaExtractNodeSqlParse() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildUpsertKafkaExtract();
+        Node outputNode = buildMysqlLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, 
outputNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
 }

Reply via email to