This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 03933e4072 [INLONG-9783][Sort] Add compatibility processing of tid to 
streamId changes in the message deserialization base class (#9785)
03933e4072 is described below

commit 03933e40722a15a50225c52396ac5f78a4897f2b
Author: baomingyu <baomingy...@163.com>
AuthorDate: Thu Mar 7 16:42:57 2024 +0800

    [INLONG-9783][Sort] Add compatibility processing of tid to streamId changes 
in the message deserialization base class (#9785)
---
 .../deserialization/InLongMsgCsv2DeserializationInfo.java      |  7 +++++--
 .../deserialization/InLongMsgCsvDeserializationInfo.java       |  9 ++++++---
 .../protocol/deserialization/InLongMsgDeserializationInfo.java | 10 +++++++++-
 .../deserialization/InLongMsgKvDeserializationInfo.java        |  7 +++++--
 .../deserialization/InLongMsgTlogCsvDeserializationInfo.java   |  7 +++++--
 .../deserialization/InLongMsgTlogKvDeserializationInfo.java    |  7 +++++--
 .../sort/protocol/deserialization/JsonDeserializationInfo.java |  3 +++
 .../sort/protocol/deserialization/KvDeserializationInfo.java   |  5 ++++-
 8 files changed, 42 insertions(+), 13 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
index 0b8e45d241..cb892c64fb 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.sort.protocol.deserialization;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
@@ -28,6 +30,7 @@ import java.util.Objects;
 /**
  * It represents CSV2 format of InLongMsg(m=9).
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class InLongMsgCsv2DeserializationInfo extends 
InLongMsgDeserializationInfo {
 
     private static final long serialVersionUID = 2188769102604850019L;
@@ -39,14 +42,14 @@ public class InLongMsgCsv2DeserializationInfo extends 
InLongMsgDeserializationIn
     private final Character escapeChar;
 
     public InLongMsgCsv2DeserializationInfo(
-            @JsonProperty("streamId") String streamId,
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter) {
         this(streamId, delimiter, null);
     }
 
     @JsonCreator
     public InLongMsgCsv2DeserializationInfo(
-            @JsonProperty("streamId") String streamId,
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
         super(streamId);
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
index d5817b4503..197be48b47 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.sort.protocol.deserialization;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -29,6 +31,7 @@ import java.util.Objects;
 /**
  * It represents CSV format of InLongMsg(m=0).
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class InLongMsgCsvDeserializationInfo extends 
InLongMsgDeserializationInfo {
 
     private static final long serialVersionUID = 1499370571949888870L;
@@ -43,13 +46,13 @@ public class InLongMsgCsvDeserializationInfo extends 
InLongMsgDeserializationInf
     private final boolean deleteHeadDelimiter;
 
     public InLongMsgCsvDeserializationInfo(
-            @JsonProperty("streamId") String streamId,
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter) {
         this(streamId, delimiter, null, false);
     }
 
     public InLongMsgCsvDeserializationInfo(
-            @JsonProperty("streamId") String streamId,
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("delete_head_delimiter") boolean 
deleteHeadDelimiter) {
         this(streamId, delimiter, null, deleteHeadDelimiter);
@@ -57,7 +60,7 @@ public class InLongMsgCsvDeserializationInfo extends 
InLongMsgDeserializationInf
 
     @JsonCreator
     public InLongMsgCsvDeserializationInfo(
-            @JsonProperty("streamId") String streamId,
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar,
             @JsonProperty("delete_head_delimiter") boolean 
deleteHeadDelimiter) {
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
index 357a4fb886..723031e9a3 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.protocol.deserialization;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -24,18 +26,24 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 /**
  * InLongMsgDeserializationInfo.
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public abstract class InLongMsgDeserializationInfo implements 
DeserializationInfo {
 
     private static final long serialVersionUID = 3707412713264864315L;
 
     private final String streamId;
 
-    public InLongMsgDeserializationInfo(@JsonProperty("streamId") String 
streamId) {
+    public InLongMsgDeserializationInfo(@JsonProperty("streamId") 
@JsonAlias(value = {"tid"}) String streamId) {
         this.streamId = checkNotNull(streamId);
     }
 
     @JsonProperty("streamId")
+    @JsonAlias(value = {"tid"})
     public String getStreamId() {
         return streamId;
     }
+
+    public String getTid() {
+        return streamId;
+    }
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
index 99ff27f8ba..de999da6ff 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.sort.protocol.deserialization;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -29,6 +31,7 @@ import java.util.Objects;
 /**
  * It represents KV format of InLongMsg(m=5).
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class InLongMsgKvDeserializationInfo extends 
InLongMsgDeserializationInfo {
 
     private static final long serialVersionUID = 8431516458466278968L;
@@ -46,7 +49,7 @@ public class InLongMsgKvDeserializationInfo extends 
InLongMsgDeserializationInfo
     private final Character lineDelimiter;
 
     public InLongMsgKvDeserializationInfo(
-            @JsonProperty("streamId") String streamId,
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("entry_delimiter") char entryDelimiter,
             @JsonProperty("kv_delimiter") char kvDelimiter) {
         this(streamId, entryDelimiter, kvDelimiter, null, null);
@@ -54,7 +57,7 @@ public class InLongMsgKvDeserializationInfo extends 
InLongMsgDeserializationInfo
 
     @JsonCreator
     public InLongMsgKvDeserializationInfo(
-            @JsonProperty("streamId") String streamId,
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("entry_delimiter") char entryDelimiter,
             @JsonProperty("kv_delimiter") char kvDelimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar,
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
index 223a12d2b5..ba935da023 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.sort.protocol.deserialization;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -29,6 +31,7 @@ import java.util.Objects;
 /**
  * It represents TLog CSV format of InLongMsg(m=10).
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class InLongMsgTlogCsvDeserializationInfo extends 
InLongMsgDeserializationInfo {
 
     private static final long serialVersionUID = -6585242216925992303L;
@@ -40,14 +43,14 @@ public class InLongMsgTlogCsvDeserializationInfo extends 
InLongMsgDeserializatio
     private final Character escapeChar;
 
     public InLongMsgTlogCsvDeserializationInfo(
-            @JsonProperty("streamId") String streamId,
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter) {
         this(streamId, delimiter, null);
     }
 
     @JsonCreator
     public InLongMsgTlogCsvDeserializationInfo(
-            @JsonProperty("streamId") String streamId,
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
         super(streamId);
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
index 77ad77ac82..6ec72fdd7a 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.sort.protocol.deserialization;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -29,6 +31,7 @@ import java.util.Objects;
 /**
  * It represents TLog KV format of InLongMsg(m=15).
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class InLongMsgTlogKvDeserializationInfo extends 
InLongMsgDeserializationInfo {
 
     private static final long serialVersionUID = 3299931901024581425L;
@@ -44,7 +47,7 @@ public class InLongMsgTlogKvDeserializationInfo extends 
InLongMsgDeserialization
     private final Character escapeChar;
 
     public InLongMsgTlogKvDeserializationInfo(
-            @JsonProperty("streamId") String streamId,
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("entry_delimiter") char entryDelimiter,
             @JsonProperty("kv_delimiter") char kvDelimiter) {
@@ -53,7 +56,7 @@ public class InLongMsgTlogKvDeserializationInfo extends 
InLongMsgDeserialization
 
     @JsonCreator
     public InLongMsgTlogKvDeserializationInfo(
-            @JsonProperty("streamId") String streamId,
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("entry_delimiter") char entryDelimiter,
             @JsonProperty("kv_delimiter") char kvDelimiter,
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
index 8b19ad729b..110c3c44aa 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
@@ -17,9 +17,12 @@
 
 package org.apache.inlong.sort.protocol.deserialization;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
 /**
  * Json deserialization info
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class JsonDeserializationInfo implements DeserializationInfo {
 
     private static final long serialVersionUID = -5344203248610337314L;
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
index 583316c783..dd14eaebdd 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
@@ -18,7 +18,9 @@
 package org.apache.inlong.sort.protocol.deserialization;
 
 import org.apache.commons.lang3.StringUtils;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
@@ -29,6 +31,7 @@ import java.util.Objects;
 /**
  * Kv deserialization info
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class KvDeserializationInfo extends InLongMsgDeserializationInfo {
 
     private static final long serialVersionUID = -3182881360079888043L;
@@ -58,7 +61,7 @@ public class KvDeserializationInfo extends 
InLongMsgDeserializationInfo {
 
     @JsonCreator
     public KvDeserializationInfo(
-            @JsonProperty("streamId") String streamId,
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("entry_splitter") char entrySplitter,
             @JsonProperty("kv_splitter") char kvSplitter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {

Reply via email to