This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 03933e4072 [INLONG-9783][Sort] Add compatibility processing of tid to streamId changes in the message deserialization base class (#9785) 03933e4072 is described below commit 03933e40722a15a50225c52396ac5f78a4897f2b Author: baomingyu <baomingy...@163.com> AuthorDate: Thu Mar 7 16:42:57 2024 +0800 [INLONG-9783][Sort] Add compatibility processing of tid to streamId changes in the message deserialization base class (#9785) --- .../deserialization/InLongMsgCsv2DeserializationInfo.java | 7 +++++-- .../deserialization/InLongMsgCsvDeserializationInfo.java | 9 ++++++--- .../protocol/deserialization/InLongMsgDeserializationInfo.java | 10 +++++++++- .../deserialization/InLongMsgKvDeserializationInfo.java | 7 +++++-- .../deserialization/InLongMsgTlogCsvDeserializationInfo.java | 7 +++++-- .../deserialization/InLongMsgTlogKvDeserializationInfo.java | 7 +++++-- .../sort/protocol/deserialization/JsonDeserializationInfo.java | 3 +++ .../sort/protocol/deserialization/KvDeserializationInfo.java | 5 ++++- 8 files changed, 42 insertions(+), 13 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java index 0b8e45d241..cb892c64fb 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java @@ -17,7 +17,9 @@ package org.apache.inlong.sort.protocol.deserialization; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; @@ -28,6 +30,7 @@ import java.util.Objects; /** * It represents CSV2 format of InLongMsg(m=9). */ +@JsonIgnoreProperties(ignoreUnknown = true) public class InLongMsgCsv2DeserializationInfo extends InLongMsgDeserializationInfo { private static final long serialVersionUID = 2188769102604850019L; @@ -39,14 +42,14 @@ public class InLongMsgCsv2DeserializationInfo extends InLongMsgDeserializationIn private final Character escapeChar; public InLongMsgCsv2DeserializationInfo( - @JsonProperty("streamId") String streamId, + @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId, @JsonProperty("delimiter") char delimiter) { this(streamId, delimiter, null); } @JsonCreator public InLongMsgCsv2DeserializationInfo( - @JsonProperty("streamId") String streamId, + @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId, @JsonProperty("delimiter") char delimiter, @JsonProperty("escape_char") @Nullable Character escapeChar) { super(streamId); diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java index d5817b4503..197be48b47 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java @@ -17,7 +17,9 @@ package org.apache.inlong.sort.protocol.deserialization; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; @@ -29,6 +31,7 @@ import java.util.Objects; /** * It represents CSV format of InLongMsg(m=0). */ +@JsonIgnoreProperties(ignoreUnknown = true) public class InLongMsgCsvDeserializationInfo extends InLongMsgDeserializationInfo { private static final long serialVersionUID = 1499370571949888870L; @@ -43,13 +46,13 @@ public class InLongMsgCsvDeserializationInfo extends InLongMsgDeserializationInf private final boolean deleteHeadDelimiter; public InLongMsgCsvDeserializationInfo( - @JsonProperty("streamId") String streamId, + @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId, @JsonProperty("delimiter") char delimiter) { this(streamId, delimiter, null, false); } public InLongMsgCsvDeserializationInfo( - @JsonProperty("streamId") String streamId, + @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId, @JsonProperty("delimiter") char delimiter, @JsonProperty("delete_head_delimiter") boolean deleteHeadDelimiter) { this(streamId, delimiter, null, deleteHeadDelimiter); @@ -57,7 +60,7 @@ public class InLongMsgCsvDeserializationInfo extends InLongMsgDeserializationInf @JsonCreator public InLongMsgCsvDeserializationInfo( - @JsonProperty("streamId") String streamId, + @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId, @JsonProperty("delimiter") char delimiter, @JsonProperty("escape_char") @Nullable Character escapeChar, @JsonProperty("delete_head_delimiter") boolean deleteHeadDelimiter) { diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java index 357a4fb886..723031e9a3 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.protocol.deserialization; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import static com.google.common.base.Preconditions.checkNotNull; @@ -24,18 +26,24 @@ import static com.google.common.base.Preconditions.checkNotNull; /** * InLongMsgDeserializationInfo. */ +@JsonIgnoreProperties(ignoreUnknown = true) public abstract class InLongMsgDeserializationInfo implements DeserializationInfo { private static final long serialVersionUID = 3707412713264864315L; private final String streamId; - public InLongMsgDeserializationInfo(@JsonProperty("streamId") String streamId) { + public InLongMsgDeserializationInfo(@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId) { this.streamId = checkNotNull(streamId); } @JsonProperty("streamId") + @JsonAlias(value = {"tid"}) public String getStreamId() { return streamId; } + + public String getTid() { + return streamId; + } } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java index 99ff27f8ba..de999da6ff 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java @@ -17,7 +17,9 @@ package org.apache.inlong.sort.protocol.deserialization; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; @@ -29,6 +31,7 @@ import java.util.Objects; /** * It represents KV format of InLongMsg(m=5). */ +@JsonIgnoreProperties(ignoreUnknown = true) public class InLongMsgKvDeserializationInfo extends InLongMsgDeserializationInfo { private static final long serialVersionUID = 8431516458466278968L; @@ -46,7 +49,7 @@ public class InLongMsgKvDeserializationInfo extends InLongMsgDeserializationInfo private final Character lineDelimiter; public InLongMsgKvDeserializationInfo( - @JsonProperty("streamId") String streamId, + @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId, @JsonProperty("entry_delimiter") char entryDelimiter, @JsonProperty("kv_delimiter") char kvDelimiter) { this(streamId, entryDelimiter, kvDelimiter, null, null); @@ -54,7 +57,7 @@ public class InLongMsgKvDeserializationInfo extends InLongMsgDeserializationInfo @JsonCreator public InLongMsgKvDeserializationInfo( - @JsonProperty("streamId") String streamId, + @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId, @JsonProperty("entry_delimiter") char entryDelimiter, @JsonProperty("kv_delimiter") char kvDelimiter, @JsonProperty("escape_char") @Nullable Character escapeChar, diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java index 223a12d2b5..ba935da023 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java @@ -17,7 +17,9 @@ package org.apache.inlong.sort.protocol.deserialization; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; @@ -29,6 +31,7 @@ import java.util.Objects; /** * It represents TLog CSV format of InLongMsg(m=10). */ +@JsonIgnoreProperties(ignoreUnknown = true) public class InLongMsgTlogCsvDeserializationInfo extends InLongMsgDeserializationInfo { private static final long serialVersionUID = -6585242216925992303L; @@ -40,14 +43,14 @@ public class InLongMsgTlogCsvDeserializationInfo extends InLongMsgDeserializatio private final Character escapeChar; public InLongMsgTlogCsvDeserializationInfo( - @JsonProperty("streamId") String streamId, + @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId, @JsonProperty("delimiter") char delimiter) { this(streamId, delimiter, null); } @JsonCreator public InLongMsgTlogCsvDeserializationInfo( - @JsonProperty("streamId") String streamId, + @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId, @JsonProperty("delimiter") char delimiter, @JsonProperty("escape_char") @Nullable Character escapeChar) { super(streamId); diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java index 77ad77ac82..6ec72fdd7a 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java @@ -17,7 +17,9 @@ package org.apache.inlong.sort.protocol.deserialization; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; @@ -29,6 +31,7 @@ import java.util.Objects; /** * It represents TLog KV format of InLongMsg(m=15). */ +@JsonIgnoreProperties(ignoreUnknown = true) public class InLongMsgTlogKvDeserializationInfo extends InLongMsgDeserializationInfo { private static final long serialVersionUID = 3299931901024581425L; @@ -44,7 +47,7 @@ public class InLongMsgTlogKvDeserializationInfo extends InLongMsgDeserialization private final Character escapeChar; public InLongMsgTlogKvDeserializationInfo( - @JsonProperty("streamId") String streamId, + @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId, @JsonProperty("delimiter") char delimiter, @JsonProperty("entry_delimiter") char entryDelimiter, @JsonProperty("kv_delimiter") char kvDelimiter) { @@ -53,7 +56,7 @@ public class InLongMsgTlogKvDeserializationInfo extends InLongMsgDeserialization @JsonCreator public InLongMsgTlogKvDeserializationInfo( - @JsonProperty("streamId") String streamId, + @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId, @JsonProperty("delimiter") char delimiter, @JsonProperty("entry_delimiter") char entryDelimiter, @JsonProperty("kv_delimiter") char kvDelimiter, diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java index 8b19ad729b..110c3c44aa 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java @@ -17,9 +17,12 @@ package org.apache.inlong.sort.protocol.deserialization; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; + /** * Json deserialization info */ +@JsonIgnoreProperties(ignoreUnknown = true) public class JsonDeserializationInfo implements DeserializationInfo { private static final long serialVersionUID = -5344203248610337314L; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java index 583316c783..dd14eaebdd 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java @@ -18,7 +18,9 @@ package org.apache.inlong.sort.protocol.deserialization; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; @@ -29,6 +31,7 @@ import java.util.Objects; /** * Kv deserialization info */ +@JsonIgnoreProperties(ignoreUnknown = true) public class KvDeserializationInfo extends InLongMsgDeserializationInfo { private static final long serialVersionUID = -3182881360079888043L; @@ -58,7 +61,7 @@ public class KvDeserializationInfo extends InLongMsgDeserializationInfo { @JsonCreator public KvDeserializationInfo( - @JsonProperty("streamId") String streamId, + @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId, @JsonProperty("entry_splitter") char entrySplitter, @JsonProperty("kv_splitter") char kvSplitter, @JsonProperty("escape_char") @Nullable Character escapeChar) {