This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 803941656 [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)
803941656 is described below

commit 8039416562f931fa22c342728ad88e0ff7e0ad36
Author: feat <featzh...@outlook.com>
AuthorDate: Thu Jan 5 10:35:58 2023 +0800

    [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)
    
    Co-authored-by: healchow <healc...@gmail.com>
---
 .../protocol/node/extract/KafkaExtractNode.java    | 53 +++++++++++++++++++---
 .../node/extract/KafkaExtractNodeTest.java         | 22 +++++++++
 2 files changed, 68 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 718c3c21c..6a0501759 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.sort.protocol.node.extract;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map.Entry;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.commons.lang3.StringUtils;
@@ -38,6 +40,7 @@ import 
org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.format.RawFormat;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
@@ -133,7 +136,17 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
     }
 
     /**
-     * generate table options
+     * Generate table options for Kafka extract node.
+     * <p/>
+     * Upsert Kafka stores message keys and values as bytes, so no need 
specified the schema or data types for Kafka.
+     * <br/>
+     * The messages of Kafka are serialized and deserialized by formats, e.g. 
csv, json, avro.
+     * <br/>
+     * Thus, the data type mapping is determined by specific formats.
+     * <p/>
+     * For more details:
+     * <a 
href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/";>
+     * upsert-kafka</a>
      *
      * @return options
      */
@@ -142,7 +155,12 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
         Map<String, String> options = super.tableOptions();
         options.put(KafkaConstant.TOPIC, topic);
         options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, 
bootstrapServers);
-        if (format instanceof JsonFormat || format instanceof AvroFormat || 
format instanceof CsvFormat) {
+
+        boolean wrapWithInlongMsg = format instanceof InLongMsgFormat;
+        Format realFormat = wrapWithInlongMsg ? ((InLongMsgFormat) 
format).getInnerFormat() : format;
+        if (realFormat instanceof JsonFormat
+                || realFormat instanceof AvroFormat
+                || realFormat instanceof CsvFormat) {
             if (StringUtils.isEmpty(this.primaryKey)) {
                 options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
                 options.put(KafkaConstant.SCAN_STARTUP_MODE, 
kafkaScanStartupMode.getValue());
@@ -152,13 +170,14 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
                 if (StringUtils.isNotBlank(scanTimestampMillis)) {
                     options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, 
scanTimestampMillis);
                 }
-                options.putAll(format.generateOptions(false));
+                
options.putAll(delegateInlongFormat(realFormat.generateOptions(false), 
wrapWithInlongMsg));
             } else {
                 options.put(KafkaConstant.CONNECTOR, 
KafkaConstant.UPSERT_KAFKA);
-                options.putAll(format.generateOptions(true));
+                
options.putAll(delegateInlongFormat(realFormat.generateOptions(true), 
wrapWithInlongMsg));
             }
-        } else if (format instanceof CanalJsonFormat || format instanceof 
DebeziumJsonFormat
-                || format instanceof RawFormat) {
+        } else if (realFormat instanceof CanalJsonFormat
+                || realFormat instanceof DebeziumJsonFormat
+                || realFormat instanceof RawFormat) {
             options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
             options.put(KafkaConstant.SCAN_STARTUP_MODE, 
kafkaScanStartupMode.getValue());
             if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
@@ -167,7 +186,7 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
             if (StringUtils.isNotBlank(scanTimestampMillis)) {
                 options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, 
scanTimestampMillis);
             }
-            options.putAll(format.generateOptions(false));
+            
options.putAll(delegateInlongFormat(realFormat.generateOptions(false), 
wrapWithInlongMsg));
         } else {
             throw new IllegalArgumentException("kafka extract node format is 
IllegalArgument");
         }
@@ -177,6 +196,26 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
         return options;
     }
 
+    private Map<String, String> delegateInlongFormat(
+            Map<String, String> realOptions,
+            boolean wrapWithInlongMsg) {
+        if (!wrapWithInlongMsg) {
+            return realOptions;
+        }
+        Map<String, String> options = new HashMap<>();
+        for (Entry<String, String> entry : realOptions.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if ("format".equals(key)) {
+                options.put("format", "inlong-msg");
+                options.put("inlong-msg.inner.format", value);
+            } else {
+                options.put("inlong-msg." + key, value);
+            }
+        }
+        return options;
+    }
+
     @Override
     public String genTableName() {
         return String.format("table_%s", super.getId());
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index 906db8231..cda33fae9 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -26,6 +26,7 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.RawFormat;
 import org.junit.Assert;
 import org.junit.Test;
@@ -112,4 +113,25 @@ public class KafkaExtractNodeTest extends 
SerializeBaseTest<KafkaExtractNode> {
         }
         Assert.assertTrue(formatEquals);
     }
+
+    @Test
+    public void testInLongFormat() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()));
+
+        KafkaExtractNode kafkaNode = getTestObject();
+        InLongMsgFormat inLongMsgFormat = new InLongMsgFormat(new CsvFormat(), 
false);
+        kafkaNode.setFormat(inLongMsgFormat);
+
+        Map<String, String> options = kafkaNode.tableOptions();
+        assertEquals("inlong-msg", options.get("format"));
+        assertEquals("csv", options.get("inlong-msg.inner.format"));
+        assertEquals("true", 
options.get("inlong-msg.csv.ignore-parse-errors"));
+
+        kafkaNode.setFormat(new CsvFormat());
+        Map<String, String> csvOptions = kafkaNode.tableOptions();
+        assertEquals("csv", csvOptions.get("format"));
+        assertEquals("true", csvOptions.get("csv.ignore-parse-errors"));
+    }
 }

Reply via email to