This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 803941656 [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107) 803941656 is described below commit 8039416562f931fa22c342728ad88e0ff7e0ad36 Author: feat <featzh...@outlook.com> AuthorDate: Thu Jan 5 10:35:58 2023 +0800 [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107) Co-authored-by: healchow <healc...@gmail.com> --- .../protocol/node/extract/KafkaExtractNode.java | 53 +++++++++++++++++++--- .../node/extract/KafkaExtractNodeTest.java | 22 +++++++++ 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java index 718c3c21c..6a0501759 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java @@ -18,6 +18,8 @@ package org.apache.inlong.sort.protocol.node.extract; import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map.Entry; import lombok.Data; import lombok.EqualsAndHashCode; import org.apache.commons.lang3.StringUtils; @@ -38,6 +40,7 @@ import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat; import org.apache.inlong.sort.protocol.node.format.CsvFormat; import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat; import org.apache.inlong.sort.protocol.node.format.Format; +import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat; import org.apache.inlong.sort.protocol.node.format.JsonFormat; import org.apache.inlong.sort.protocol.node.format.RawFormat; import org.apache.inlong.sort.protocol.transformation.WatermarkField; @@ -133,7 +136,17 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad } /** - * generate table options + * Generate table options for Kafka extract node. + * <p/> + * Upsert Kafka stores message keys and values as bytes, so no need specified the schema or data types for Kafka. + * <br/> + * The messages of Kafka are serialized and deserialized by formats, e.g. csv, json, avro. + * <br/> + * Thus, the data type mapping is determined by specific formats. + * <p/> + * For more details: + * <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/"> + * upsert-kafka</a> * * @return options */ @@ -142,7 +155,12 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad Map<String, String> options = super.tableOptions(); options.put(KafkaConstant.TOPIC, topic); options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, bootstrapServers); - if (format instanceof JsonFormat || format instanceof AvroFormat || format instanceof CsvFormat) { + + boolean wrapWithInlongMsg = format instanceof InLongMsgFormat; + Format realFormat = wrapWithInlongMsg ? ((InLongMsgFormat) format).getInnerFormat() : format; + if (realFormat instanceof JsonFormat + || realFormat instanceof AvroFormat + || realFormat instanceof CsvFormat) { if (StringUtils.isEmpty(this.primaryKey)) { options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA); options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue()); @@ -152,13 +170,14 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad if (StringUtils.isNotBlank(scanTimestampMillis)) { options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis); } - options.putAll(format.generateOptions(false)); + options.putAll(delegateInlongFormat(realFormat.generateOptions(false), wrapWithInlongMsg)); } else { options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA); - options.putAll(format.generateOptions(true)); + options.putAll(delegateInlongFormat(realFormat.generateOptions(true), wrapWithInlongMsg)); } - } else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat - || format instanceof RawFormat) { + } else if (realFormat instanceof CanalJsonFormat + || realFormat instanceof DebeziumJsonFormat + || realFormat instanceof RawFormat) { options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA); options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue()); if (StringUtils.isNotEmpty(scanSpecificOffsets)) { @@ -167,7 +186,7 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad if (StringUtils.isNotBlank(scanTimestampMillis)) { options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis); } - options.putAll(format.generateOptions(false)); + options.putAll(delegateInlongFormat(realFormat.generateOptions(false), wrapWithInlongMsg)); } else { throw new IllegalArgumentException("kafka extract node format is IllegalArgument"); } @@ -177,6 +196,26 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad return options; } + private Map<String, String> delegateInlongFormat( + Map<String, String> realOptions, + boolean wrapWithInlongMsg) { + if (!wrapWithInlongMsg) { + return realOptions; + } + Map<String, String> options = new HashMap<>(); + for (Entry<String, String> entry : realOptions.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if ("format".equals(key)) { + options.put("format", "inlong-msg"); + options.put("inlong-msg.inner.format", value); + } else { + options.put("inlong-msg." + key, value); + } + } + return options; + } + @Override public String genTableName() { return String.format("table_%s", super.getId()); diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java index 906db8231..cda33fae9 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java @@ -26,6 +26,7 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode; import org.apache.inlong.sort.protocol.node.format.CsvFormat; +import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat; import org.apache.inlong.sort.protocol.node.format.RawFormat; import org.junit.Assert; import org.junit.Test; @@ -112,4 +113,25 @@ public class KafkaExtractNodeTest extends SerializeBaseTest<KafkaExtractNode> { } Assert.assertTrue(formatEquals); } + + @Test + public void testInLongFormat() { + List<FieldInfo> fields = Arrays.asList( + new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("age", new IntFormatInfo())); + + KafkaExtractNode kafkaNode = getTestObject(); + InLongMsgFormat inLongMsgFormat = new InLongMsgFormat(new CsvFormat(), false); + kafkaNode.setFormat(inLongMsgFormat); + + Map<String, String> options = kafkaNode.tableOptions(); + assertEquals("inlong-msg", options.get("format")); + assertEquals("csv", options.get("inlong-msg.inner.format")); + assertEquals("true", options.get("inlong-msg.csv.ignore-parse-errors")); + + kafkaNode.setFormat(new CsvFormat()); + Map<String, String> csvOptions = kafkaNode.tableOptions(); + assertEquals("csv", csvOptions.get("format")); + assertEquals("true", csvOptions.get("csv.ignore-parse-errors")); + } }