This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6936c319fd [INLONG-9075][Sort] TubeMQSource support InlongMsg format 
(#9076)
6936c319fd is described below

commit 6936c319fdfe60adb9aeb7bc60d2adbd7c26697b
Author: vernedeng <verned...@apache.org>
AuthorDate: Fri Oct 20 14:02:20 2023 +0800

    [INLONG-9075][Sort] TubeMQSource support InlongMsg format (#9076)
    
    * [INLONG-9075][Sort] TubeMQSource support InlongMsg format
---
 .../manager/pojo/sort/node/provider/TubeMqProvider.java  |  3 ++-
 .../inlong/manager/pojo/source/tubemq/TubeMQSource.java  |  3 +++
 .../inlong/sort/protocol/constant/TubeMQConstant.java    |  3 ++-
 .../apache/inlong/sort/protocol/node/ExtractNode.java    |  2 ++
 .../sort/protocol/node/extract/TubeMQExtractNode.java    | 16 ++++++++++++++--
 .../protocol/node/extract/TubeMQExtractNodeTest.java     |  2 +-
 .../inlong/sort/parser/TubeMQNodeSqlParseTest.java       |  2 +-
 .../sort/tubemq/table/TubeMQDynamicTableFactory.java     |  2 +-
 .../sort/tubemq/table/TubeMQDynamicTableFactory.java     |  8 +++-----
 .../apache/inlong/sort/tubemq/table/TubeMQOptions.java   |  7 +++++++
 10 files changed, 36 insertions(+), 12 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
index 72399221f8..64b0c0a71f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
@@ -55,6 +55,7 @@ public class TubeMqProvider implements ExtractNodeProvider {
                 source.getSerializationType(),
                 source.getGroupId(),
                 source.getSessionKey(),
-                source.getTid());
+                source.getTid(),
+                source.getInnerFormat());
     }
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
index 6e6720208c..0a0e49127f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
@@ -57,6 +57,9 @@ public class TubeMQSource extends StreamSource {
     @ApiModelProperty("Session key of the TubeMQ")
     private String sessionKey;
 
+    @ApiModelProperty("inlong-msg.inner.format")
+    private String innerFormat;
+
     /**
      * The TubeMQ consumers use this tid set to filter records reading from 
server.
      */
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
index e5b575d053..871e3eba41 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
@@ -28,11 +28,12 @@ public class TubeMQConstant {
 
     public static final String CONNECTOR = "connector";
 
-    public static final String TUBEMQ = "tubemq";
+    public static final String TUBEMQ = "tubemq-inlong";
 
     public static final String MASTER_RPC = "master.rpc";
 
     public static final String FORMAT = "format";
+    public static final String INNER_FORMAT = "inlong-msg.inner.format";
 
     public static final String SESSION_KEY = "session.key";
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index 09e67d573d..b9d649d621 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -71,6 +71,8 @@ import java.util.Map;
 @NoArgsConstructor
 public abstract class ExtractNode implements Node {
 
+    public static final String INLONG_MSG = "inlong-msg";
+
     @JsonProperty("id")
     private String id;
     @JsonInclude(Include.NON_NULL)
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
index 71a5347e67..bbcdbbecd5 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.protocol.node.extract;
 
+import org.apache.inlong.sort.formats.util.StringUtils;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.constant.TubeMQConstant;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
@@ -72,6 +73,9 @@ public class TubeMQExtractNode extends ExtractNode implements 
Serializable {
     @JsonProperty("tid")
     private TreeSet<String> tid;
 
+    @JsonProperty("inlong-msg.inner.format")
+    private String innerFormat;
+
     @JsonCreator
     public TubeMQExtractNode(
             @JsonProperty("id") String id,
@@ -84,13 +88,15 @@ public class TubeMQExtractNode extends ExtractNode 
implements Serializable {
             @Nonnull @JsonProperty("format") String format,
             @Nonnull @JsonProperty("groupId") String groupId,
             @JsonProperty("sessionKey") String sessionKey,
-            @JsonProperty("tid") TreeSet<String> tid) {
+            @JsonProperty("tid") TreeSet<String> tid,
+            @JsonProperty("inlong-msg.inner.format") String innerFormat) {
         super(id, name, fields, waterMarkField, properties);
         this.masterRpc = Preconditions.checkNotNull(masterRpc, "TubeMQ 
masterRpc is null");
         this.topic = Preconditions.checkNotNull(topic, "TubeMQ topic is null");
         this.format = Preconditions.checkNotNull(format, "Format is null");
         this.groupId = Preconditions.checkNotNull(groupId, "Group id is null");
         this.sessionKey = sessionKey;
+        this.innerFormat = innerFormat;
         this.tid = tid;
     }
 
@@ -103,9 +109,15 @@ public class TubeMQExtractNode extends ExtractNode 
implements Serializable {
         map.put(TubeMQConstant.GROUP_ID, groupId);
         map.put(TubeMQConstant.FORMAT, format);
         map.put(TubeMQConstant.SESSION_KEY, sessionKey);
+        if (format.startsWith(INLONG_MSG)) {
+            map.put(TubeMQConstant.INNER_FORMAT, innerFormat);
+        }
+
         if (null != tid && !tid.isEmpty()) {
-            map.put(TubeMQConstant.TID, tid.toString());
+            map.put(TubeMQConstant.TID, StringUtils.concatCsv(tid.toArray(new 
String[0]),
+                    ',', null, null));
         }
+
         return map;
     }
 
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java
index 3943bdbfe5..3b824e0280 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java
@@ -40,7 +40,7 @@ public class TubeMQExtractNodeTest extends 
SerializeBaseTest<TubeMQExtractNode>
                 new FieldInfo("salary", new FloatFormatInfo()));
 
         return new TubeMQExtractNode("1", "tubeMQ_input", fields, null, null,
-                "127.0.0.1:8715", "inlong", "json", "test", null, null);
+                "127.0.0.1:8715", "inlong", "json", "test", null, null, null);
 
     }
 }
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
index 9d6cb5336c..d9421e9e6d 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
@@ -59,7 +59,7 @@ public class TubeMQNodeSqlParseTest extends AbstractTestBase {
                 new FieldInfo("salary", new FloatFormatInfo()));
 
         return new TubeMQExtractNode(id, "tubeMQ_input", fields, null, null,
-                "127.0.0.1:8715", "inlong", "json", "test", null, null);
+                "127.0.0.1:8715", "inlong", "json", "test", null, null, null);
     }
 
     /**
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 28a538c22a..7101a475a7 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -58,7 +58,7 @@ import static 
org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQPropert
  */
 public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory {
 
-    public static final String IDENTIFIER = "tubemq";
+    public static final String IDENTIFIER = "tubemq-inlong";
 
     public static final List<String> INNERFORMATTYPE = 
Arrays.asList("inlong-msg");
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 17275d8d11..f6130b5288 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -40,9 +40,7 @@ import 
org.apache.flink.table.factories.SerializationFormatFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
 
-import java.util.Arrays;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
@@ -63,9 +61,9 @@ import static 
org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQPropert
  */
 public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
 
-    public static final String IDENTIFIER = "tubemq";
+    public static final String IDENTIFIER = "tubemq-inlong";
 
-    public static final List<String> INNERFORMATTYPE = 
Arrays.asList("inlong-msg");
+    public static final String INNERFORMATTYPE = "inlong-msg";
 
     public static boolean innerFormat = false;
 
@@ -87,7 +85,6 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
                 && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
             Configuration options = 
Configuration.fromMap(catalogTable.getOptions());
             String formatName = 
options.getOptional(FORMAT).orElse(options.get(FORMAT));
-            innerFormat = INNERFORMATTYPE.contains(formatName);
             throw new ValidationException(String.format(
                     "The TubeMQ table '%s' with '%s' format doesn't support 
defining PRIMARY KEY constraint"
                             + " on the table, because it can't guarantee the 
semantic of primary key.",
@@ -123,6 +120,7 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
         helper.validate();
 
         validatePKConstraints(context.getObjectIdentifier(), 
context.getCatalogTable(), valueDecodingFormat);
+        innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT));
 
         final Configuration properties = 
getTubeMQProperties(context.getCatalogTable().getOptions());
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
index 0085100c87..76f85f0563 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
@@ -77,6 +77,13 @@ public class TubeMQOptions {
     // TubeMQ specific options
     // 
--------------------------------------------------------------------------------------------
 
+    public static final ConfigOption<String> INNER_FORMAT =
+            ConfigOptions.key("inlong-msg.inner.format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Inner format");
+
     public static final ConfigOption<String> TOPIC =
             ConfigOptions.key("topic")
                     .stringType()

Reply via email to