This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e60e726aa [INLONG-9077][Sort] Fix TubeMQ connector fail to subscribe 
streamId (#9078)
0e60e726aa is described below

commit 0e60e726aa2531f63f6314e6c0430157859bc1db
Author: vernedeng <verned...@apache.org>
AuthorDate: Fri Oct 20 17:29:28 2023 +0800

    [INLONG-9077][Sort] Fix TubeMQ connector fail to subscribe streamId (#9078)
---
 inlong-distribution/pom.xml                        |  2 +-
 .../pojo/sort/node/provider/TubeMqProvider.java    |  4 ++--
 .../manager/pojo/source/tubemq/TubeMQSource.java   |  6 +++---
 .../source/tubemq/TubeMQSourceOperator.java        |  2 +-
 .../sort/protocol/constant/TubeMQConstant.java     |  6 +++---
 .../protocol/node/extract/TubeMQExtractNode.java   | 24 +++++++++++-----------
 inlong-sort/sort-core/pom.xml                      |  6 ++++++
 .../tubemq/table/TubeMQDynamicTableFactory.java    |  8 ++++----
 .../inlong/sort/tubemq/table/TubeMQOptions.java    | 19 +++++++++++------
 .../sort-connectors/pulsar/pom.xml                 |  2 ++
 .../inlong/sort/tubemq/FlinkTubeMQConsumer.java    | 14 ++++++-------
 .../inlong/sort/tubemq/FlinkTubeMQProducer.java    | 10 ++++-----
 .../tubemq/table/TubeMQDynamicTableFactory.java    |  4 ++--
 .../inlong/sort/tubemq/table/TubeMQOptions.java    |  8 ++++----
 .../inlong/sort/tubemq/table/TubeMQTableSink.java  | 14 ++++++-------
 .../sort/tubemq/table/TubeMQTableSource.java       | 18 ++++++++--------
 inlong-tubemq/tubemq-client/pom.xml                |  2 ++
 inlong-tubemq/tubemq-server/pom.xml                |  1 +
 18 files changed, 84 insertions(+), 66 deletions(-)

diff --git a/inlong-distribution/pom.xml b/inlong-distribution/pom.xml
index 5107b1a0fa..14b4a30804 100644
--- a/inlong-distribution/pom.xml
+++ b/inlong-distribution/pom.xml
@@ -73,7 +73,7 @@
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>exec-maven-plugin</artifactId>
-                <version>3.1.0</version>
+                <version>${exec.maven.version}</version>
                 <configuration>
                     
<executable>${basedir}/script/backup_module_dependencys.sh</executable>
                 </configuration>
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
index 64b0c0a71f..daaf5a94fb 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
@@ -53,9 +53,9 @@ public class TubeMqProvider implements ExtractNodeProvider {
                 source.getMasterRpc(),
                 source.getTopic(),
                 source.getSerializationType(),
-                source.getGroupId(),
+                source.getConsumeGroup(),
                 source.getSessionKey(),
-                source.getTid(),
+                source.getStreamId(),
                 source.getInnerFormat());
     }
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
index 0a0e49127f..a295b0329e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
@@ -52,7 +52,7 @@ public class TubeMQSource extends StreamSource {
     private String topic;
 
     @ApiModelProperty("Group of the TubeMQ")
-    private String groupId;
+    private String consumeGroup;
 
     @ApiModelProperty("Session key of the TubeMQ")
     private String sessionKey;
@@ -61,10 +61,10 @@ public class TubeMQSource extends StreamSource {
     private String innerFormat;
 
     /**
-     * The TubeMQ consumers use this tid set to filter records reading from 
server.
+     * The TubeMQ consumers use this streamId set to filter records reading 
from server.
      */
     @ApiModelProperty("Tid of the TubeMQ")
-    private TreeSet<String> tid;
+    private TreeSet<String> streamId;
 
     public TubeMQSource() {
         this.setSourceType(SourceType.TUBEMQ);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 7fc56539d9..df7af84b09 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -107,7 +107,7 @@ public class TubeMQSourceOperator extends 
AbstractSourceOperator {
             String streamId = streamInfo.getInlongStreamId();
             tubeMQSource.setSourceName(streamId);
             tubeMQSource.setTopic(groupInfo.getMqResource());
-            tubeMQSource.setGroupId(streamId);
+            tubeMQSource.setConsumeGroup(streamId);
             tubeMQSource.setMasterRpc(masterRpc);
             tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
index 871e3eba41..eee84f1186 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
@@ -24,7 +24,7 @@ public class TubeMQConstant {
 
     public static final String TOPIC = "topic";
 
-    public static final String GROUP_ID = "group.id";
+    public static final String CONSUME_GROUP = "consume.group";
 
     public static final String CONNECTOR = "connector";
 
@@ -38,9 +38,9 @@ public class TubeMQConstant {
     public static final String SESSION_KEY = "session.key";
 
     /**
-     * The tubemq consumers use this tid set to filter records reading from 
server.
+     * The tubemq consumers use this streamId set to filter records reading 
from server.
      */
-    public static final String TID = "tid";
+    public static final String STREAMID = "stream.id";
 
     public static final String CONSUMER_STARTUP_MODE = "consumer.startup.mode";
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
index bbcdbbecd5..4c55adb3de 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
@@ -61,17 +61,17 @@ public class TubeMQExtractNode extends ExtractNode 
implements Serializable {
     private String format;
 
     @Nonnull
-    @JsonProperty("groupId")
-    private String groupId;
+    @JsonProperty("consumeGroup")
+    private String consumeGroup;
 
     @JsonProperty("sessionKey")
     private String sessionKey;
 
     /**
-     * The tubemq consumers use this tid set to filter records reading from 
server.
+     * The tubemq consumers use this streamId set to filter records reading 
from server.
      */
-    @JsonProperty("tid")
-    private TreeSet<String> tid;
+    @JsonProperty("streamId")
+    private TreeSet<String> streamId;
 
     @JsonProperty("inlong-msg.inner.format")
     private String innerFormat;
@@ -86,18 +86,18 @@ public class TubeMQExtractNode extends ExtractNode 
implements Serializable {
             @Nonnull @JsonProperty("masterRpc") String masterRpc,
             @Nonnull @JsonProperty("topic") String topic,
             @Nonnull @JsonProperty("format") String format,
-            @Nonnull @JsonProperty("groupId") String groupId,
+            @Nonnull @JsonProperty("consumeGroup") String consumeGroup,
             @JsonProperty("sessionKey") String sessionKey,
-            @JsonProperty("tid") TreeSet<String> tid,
+            @JsonProperty("streamId") TreeSet<String> streamId,
             @JsonProperty("inlong-msg.inner.format") String innerFormat) {
         super(id, name, fields, waterMarkField, properties);
         this.masterRpc = Preconditions.checkNotNull(masterRpc, "TubeMQ 
masterRpc is null");
         this.topic = Preconditions.checkNotNull(topic, "TubeMQ topic is null");
         this.format = Preconditions.checkNotNull(format, "Format is null");
-        this.groupId = Preconditions.checkNotNull(groupId, "Group id is null");
+        this.consumeGroup = Preconditions.checkNotNull(consumeGroup, "Group id 
is null");
         this.sessionKey = sessionKey;
+        this.streamId = streamId;
         this.innerFormat = innerFormat;
-        this.tid = tid;
     }
 
     @Override
@@ -106,15 +106,15 @@ public class TubeMQExtractNode extends ExtractNode 
implements Serializable {
         map.put(TubeMQConstant.CONNECTOR, TubeMQConstant.TUBEMQ);
         map.put(TubeMQConstant.TOPIC, topic);
         map.put(TubeMQConstant.MASTER_RPC, masterRpc);
-        map.put(TubeMQConstant.GROUP_ID, groupId);
+        map.put(TubeMQConstant.CONSUME_GROUP, consumeGroup);
         map.put(TubeMQConstant.FORMAT, format);
         map.put(TubeMQConstant.SESSION_KEY, sessionKey);
         if (format.startsWith(INLONG_MSG)) {
             map.put(TubeMQConstant.INNER_FORMAT, innerFormat);
         }
 
-        if (null != tid && !tid.isEmpty()) {
-            map.put(TubeMQConstant.TID, StringUtils.concatCsv(tid.toArray(new 
String[0]),
+        if (null != streamId && !streamId.isEmpty()) {
+            map.put(TubeMQConstant.STREAMID, 
StringUtils.concatCsv(streamId.toArray(new String[0]),
                     ',', null, null));
         }
 
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index 50b41d6833..c463785605 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -245,6 +245,12 @@
         <profile>
             <id>v1.15</id>
             <dependencies>
+                <dependency>
+                    <groupId>org.apache.inlong</groupId>
+                    <artifactId>sort-connector-tubemq-v1.15</artifactId>
+                    <version>${project.version}</version>
+                    <scope>test</scope>
+                </dependency>
                 <dependency>
                     <groupId>org.apache.inlong</groupId>
                     <artifactId>sort-connector-postgres-cdc-v1.15</artifactId>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 7101a475a7..8aad010fba 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -44,11 +44,11 @@ import java.util.TreeSet;
 
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static 
org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.GROUP_ID;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
 import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
 import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC;
 import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TID;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.STREAMID;
 import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC;
 import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC_PATTERN;
 import static 
org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQProperties;
@@ -169,8 +169,8 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory {
         final Set<ConfigOption<?>> options = new HashSet<>();
         options.add(FORMAT);
         options.add(TOPIC);
-        options.add(GROUP_ID);
-        options.add(TID);
+        options.add(CONSUME_GROUP);
+        options.add(STREAMID);
         options.add(SESSION_KEY);
         options.add(BOOTSTRAP_FROM_MAX);
         options.add(TOPIC_PATTERN);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
index ba069094bb..357da2dd09 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
@@ -87,6 +87,13 @@ public class TubeMQOptions {
     // TubeMQ specific options
     // 
--------------------------------------------------------------------------------------------
 
+    public static final ConfigOption<String> INNER_FORMAT =
+            ConfigOptions.key("inlong-msg.inner.format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Inner format");
+
     public static final ConfigOption<String> TOPIC =
             ConfigOptions.key("topic")
                     .stringType()
@@ -109,8 +116,8 @@ public class TubeMQOptions {
                     .noDefaultValue()
                     .withDescription("Required TubeMQ master connection 
string");
 
-    public static final ConfigOption<String> GROUP_ID =
-            ConfigOptions.key("group.id")
+    public static final ConfigOption<String> CONSUME_GROUP =
+            ConfigOptions.key("consume.group")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
@@ -140,8 +147,8 @@ public class TubeMQOptions {
                     .defaultValue("default_session_key")
                     .withDescription("The session key for this consumer group 
at startup.");
 
-    public static final ConfigOption<List<String>> TID =
-            ConfigOptions.key("topic.tid")
+    public static final ConfigOption<List<String>> STREAMID =
+            ConfigOptions.key("stream.id")
                     .stringType()
                     .asList()
                     .noDefaultValue()
@@ -385,7 +392,7 @@ public class TubeMQOptions {
 
     public static TreeSet<String> getTiSet(ReadableConfig tableOptions) {
         TreeSet<String> set = new TreeSet<>();
-        tableOptions.getOptional(TID).ifPresent(new Consumer<List<String>>() {
+        tableOptions.getOptional(STREAMID).ifPresent(new 
Consumer<List<String>>() {
 
             @Override
             public void accept(List<String> strings) {
@@ -396,7 +403,7 @@ public class TubeMQOptions {
     }
 
     public static String getConsumerGroup(ReadableConfig tableOptions) {
-        return tableOptions.getOptional(GROUP_ID).orElse(null);
+        return tableOptions.getOptional(CONSUME_GROUP).orElse(null);
     }
 
     public static String getSessionKey(ReadableConfig tableOptions) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
index e01c3278fa..501d39c637 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
@@ -130,6 +130,7 @@
             <plugin>
                 <groupId>org.xolstice.maven.plugins</groupId>
                 <artifactId>protobuf-maven-plugin</artifactId>
+                <version>${protobuf.maven.plugin.version}</version>
                 <extensions>true</extensions>
                 <configuration>
                     <!-- Currently Flink azure test pipeline would first 
pre-compile and then upload the compiled
@@ -157,6 +158,7 @@
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
+                <version>${build.helper.maven.version}</version>
                 <executions>
                     <execution>
                         <id>add-test-source</id>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index b890561681..0180f90e02 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -80,9 +80,9 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
     private final String topic;
 
     /**
-     * The tubemq consumers use this tid set to filter records reading from 
server.
+     * The tubemq consumers use this streamId set to filter records reading 
from server.
      */
-    private final TreeSet<String> tidSet;
+    private final TreeSet<String> streamIdSet;
 
     /**
      * The consumer group name.
@@ -147,7 +147,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
      *
      * @param masterAddress         the master address of TubeMQ
      * @param topic                 the topic name
-     * @param tidSet                the  topic's filter condition items
+     * @param streamIdSet                the  topic's filter condition items
      * @param consumerGroup         the consumer group name
      * @param deserializationSchema the deserialize schema
      * @param configuration         the configure
@@ -156,7 +156,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
     public FlinkTubeMQConsumer(
             String masterAddress,
             String topic,
-            TreeSet<String> tidSet,
+            TreeSet<String> streamIdSet,
             String consumerGroup,
             DeserializationSchema<T> deserializationSchema,
             Configuration configuration,
@@ -164,14 +164,14 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
             Boolean innerFormat) {
         checkNotNull(masterAddress, "The master address must not be null.");
         checkNotNull(topic, "The topic must not be null.");
-        checkNotNull(tidSet, "The tid set must not be null.");
+        checkNotNull(streamIdSet, "The streamId set must not be null.");
         checkNotNull(consumerGroup, "The consumer group must not be null.");
         checkNotNull(deserializationSchema, "The deserialization schema must 
not be null.");
         checkNotNull(configuration, "The configuration must not be null.");
 
         this.masterAddress = masterAddress;
         this.topic = topic;
-        this.tidSet = tidSet;
+        this.streamIdSet = streamIdSet;
         this.consumerGroup = consumerGroup;
         this.deserializationSchema = deserializationSchema;
         this.sessionKey = sessionKey;
@@ -217,7 +217,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
         final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
         messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         messagePullConsumer = 
messageSessionFactory.createPullConsumer(consumerConfig);
-        messagePullConsumer.subscribe(topic, tidSet);
+        messagePullConsumer.subscribe(topic, streamIdSet);
         messagePullConsumer.completeSubscribe(sessionKey, numTasks, true, 
currentOffsets);
 
         running = true;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
index fb2f624961..3f04594b67 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
@@ -60,9 +60,9 @@ public class FlinkTubeMQProducer<T> extends 
RichSinkFunction<T> implements Check
     private final String topic;
 
     /**
-     * The tubemq consumers use this tid set to filter records reading from 
server.
+     * The tubemq consumers use this streamId set to filter records reading 
from server.
      */
-    private final TreeSet<String> tidSet;
+    private final TreeSet<String> streamIdSet;
     /**
      * The serializer for the records sent to tube.
      */
@@ -86,12 +86,12 @@ public class FlinkTubeMQProducer<T> extends 
RichSinkFunction<T> implements Check
     public FlinkTubeMQProducer(String topic,
             String masterAddress,
             SerializationSchema<T> serializationSchema,
-            TreeSet<String> tidSet,
+            TreeSet<String> streamIdSet,
             Configuration configuration) {
         checkNotNull(topic, "The topic must not be null.");
         checkNotNull(masterAddress, "The master address must not be null.");
         checkNotNull(serializationSchema, "The serialization schema must not 
be null.");
-        checkNotNull(tidSet, "The tid set must not be null.");
+        checkNotNull(streamIdSet, "The streamId set must not be null.");
         checkNotNull(configuration, "The configuration must not be null.");
 
         int max_retries = configuration.getInteger(TubeMQOptions.MAX_RETRIES);
@@ -100,7 +100,7 @@ public class FlinkTubeMQProducer<T> extends 
RichSinkFunction<T> implements Check
         this.topic = topic;
         this.masterAddress = masterAddress;
         this.serializationSchema = serializationSchema;
-        this.tidSet = tidSet;
+        this.streamIdSet = streamIdSet;
         this.maxRetries = max_retries;
     }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index f6130b5288..81b6709418 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -47,7 +47,7 @@ import java.util.TreeSet;
 
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static 
org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.GROUP_NAME;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
 import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
 import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC;
 import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY;
@@ -220,7 +220,7 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
         final Set<ConfigOption<?>> options = new HashSet<>();
         options.add(FORMAT);
         options.add(TOPIC);
-        options.add(GROUP_NAME);
+        options.add(CONSUME_GROUP);
         options.add(STREAMID);
         options.add(SESSION_KEY);
         options.add(BOOTSTRAP_FROM_MAX);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
index 76f85f0563..0b4ea93978 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
@@ -106,8 +106,8 @@ public class TubeMQOptions {
                     .noDefaultValue()
                     .withDescription("Required TubeMQ master connection 
string");
 
-    public static final ConfigOption<String> GROUP_NAME =
-            ConfigOptions.key("group.name")
+    public static final ConfigOption<String> CONSUME_GROUP =
+            ConfigOptions.key("consume.group")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
@@ -138,7 +138,7 @@ public class TubeMQOptions {
                     .withDescription("The session key for this consumer group 
at startup.");
 
     public static final ConfigOption<List<String>> STREAMID =
-            ConfigOptions.key("topic.streamId")
+            ConfigOptions.key("stream.id")
                     .stringType()
                     .asList()
                     .noDefaultValue()
@@ -275,7 +275,7 @@ public class TubeMQOptions {
     }
 
     public static String getConsumerGroup(ReadableConfig tableOptions) {
-        return tableOptions.getOptional(GROUP_NAME).orElse(null);
+        return tableOptions.getOptional(CONSUME_GROUP).orElse(null);
     }
 
     public static String getSessionKey(ReadableConfig tableOptions) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
index 5d2f8c2a4d..cf5109d82f 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
@@ -50,9 +50,9 @@ public class TubeMQTableSink implements DynamicTableSink {
      */
     private final String masterAddress;
     /**
-     * The TubeMQ tid filter collection.
+     * The TubeMQ streamId filter collection.
      */
-    private final TreeSet<String> tidSet;
+    private final TreeSet<String> streamIdSet;
     /**
      * The parameters collection for tubemq producer.
      */
@@ -63,20 +63,20 @@ public class TubeMQTableSink implements DynamicTableSink {
             EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
             String topic,
             String masterAddress,
-            TreeSet<String> tidSet,
+            TreeSet<String> streamIdSet,
             Configuration configuration) {
         Preconditions.checkNotNull(valueEncodingFormat, "The serialization 
schema must not be null.");
         Preconditions.checkNotNull(physicalDataType, "Physical data type must 
not be null.");
         Preconditions.checkNotNull(topic, "Topic must not be null.");
         Preconditions.checkNotNull(masterAddress, "Master address must not be 
null.");
         Preconditions.checkNotNull(configuration, "The configuration must not 
be null.");
-        Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+        Preconditions.checkNotNull(streamIdSet, "The streamId set must not be 
null.");
 
         this.valueEncodingFormat = valueEncodingFormat;
         this.physicalDataType = physicalDataType;
         this.topic = topic;
         this.masterAddress = masterAddress;
-        this.tidSet = tidSet;
+        this.streamIdSet = streamIdSet;
         this.configuration = configuration;
     }
 
@@ -102,7 +102,7 @@ public class TubeMQTableSink implements DynamicTableSink {
             SerializationSchema<RowData> serializationSchema,
             Configuration configuration) {
         final FlinkTubeMQProducer<RowData> tubeMQProducer =
-                new FlinkTubeMQProducer(topic, masterAddress, 
serializationSchema, tidSet, configuration);
+                new FlinkTubeMQProducer(topic, masterAddress, 
serializationSchema, streamIdSet, configuration);
         return tubeMQProducer;
     }
 
@@ -120,7 +120,7 @@ public class TubeMQTableSink implements DynamicTableSink {
                 valueEncodingFormat,
                 topic,
                 masterAddress,
-                tidSet,
+                streamIdSet,
                 configuration);
     }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
index 2dc21ca2e8..e79685ff70 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -84,9 +84,9 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
      */
     private final String topic;
     /**
-     * The TubeMQ tid filter collection.
+     * The TubeMQ streamId filter collection.
      */
-    private final TreeSet<String> tidSet;
+    private final TreeSet<String> streamIdSet;
     /**
      * The TubeMQ consumer group name.
      */
@@ -129,7 +129,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
     public TubeMQTableSource(DataType physicalDataType,
             DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
             String masterAddress, String topic,
-            TreeSet<String> tidSet, String consumerGroup, String sessionKey,
+            TreeSet<String> streamIdSet, String consumerGroup, String 
sessionKey,
             Configuration configuration, @Nullable WatermarkStrategy<RowData> 
watermarkStrategy,
             Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean 
innerFormat) {
 
@@ -137,7 +137,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
         Preconditions.checkNotNull(valueDecodingFormat, "The deserialization 
schema must not be null.");
         Preconditions.checkNotNull(masterAddress, "The master address must not 
be null.");
         Preconditions.checkNotNull(topic, "The topic must not be null.");
-        Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+        Preconditions.checkNotNull(streamIdSet, "The streamId set must not be 
null.");
         Preconditions.checkNotNull(consumerGroup, "The consumer group must not 
be null.");
         Preconditions.checkNotNull(configuration, "The configuration must not 
be null.");
 
@@ -147,7 +147,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
         this.valueDecodingFormat = valueDecodingFormat;
         this.masterAddress = masterAddress;
         this.topic = topic;
-        this.tidSet = tidSet;
+        this.streamIdSet = streamIdSet;
         this.consumerGroup = consumerGroup;
         this.sessionKey = sessionKey;
         this.configuration = configuration;
@@ -182,7 +182,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
     public DynamicTableSource copy() {
         return new TubeMQTableSource(
                 physicalDataType, valueDecodingFormat, masterAddress,
-                topic, tidSet, consumerGroup, sessionKey, configuration,
+                topic, streamIdSet, consumerGroup, sessionKey, configuration,
                 watermarkStrategy, proctimeAttribute, ignoreErrors, 
innerFormat);
     }
 
@@ -247,7 +247,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
                 && Objects.equals(valueDecodingFormat, 
that.valueDecodingFormat)
                 && Objects.equals(masterAddress, that.masterAddress)
                 && Objects.equals(topic, that.topic)
-                && Objects.equals(String.valueOf(tidSet), 
String.valueOf(that.tidSet))
+                && Objects.equals(String.valueOf(streamIdSet), 
String.valueOf(that.streamIdSet))
                 && Objects.equals(consumerGroup, that.consumerGroup)
                 && Objects.equals(proctimeAttribute, that.proctimeAttribute)
                 && Objects.equals(watermarkStrategy, that.watermarkStrategy);
@@ -260,7 +260,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
                 valueDecodingFormat,
                 masterAddress,
                 topic,
-                tidSet,
+                streamIdSet,
                 consumerGroup,
                 configuration,
                 watermarkStrategy,
@@ -302,7 +302,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
         final DeserializationSchema<RowData> tubeMQDeserializer = new 
DynamicTubeMQDeserializationSchema(
                 deserialization, metadataConverters, producedTypeInfo, 
ignoreErrors);
 
-        final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new 
FlinkTubeMQConsumer(masterAddress, topic, tidSet,
+        final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new 
FlinkTubeMQConsumer(masterAddress, topic, streamIdSet,
                 consumerGroup, tubeMQDeserializer, configuration, sessionKey, 
innerFormat);
         return tubeMQConsumer;
     }
diff --git a/inlong-tubemq/tubemq-client/pom.xml 
b/inlong-tubemq/tubemq-client/pom.xml
index b6055071d0..cff6081658 100644
--- a/inlong-tubemq/tubemq-client/pom.xml
+++ b/inlong-tubemq/tubemq-client/pom.xml
@@ -94,6 +94,7 @@
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>exec-maven-plugin</artifactId>
+                <version>${exec.maven.version}</version>
                 <executions>
                     <execution>
                         <id>version</id>
@@ -114,6 +115,7 @@
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
+                <version>${build.helper.maven.version}</version>
                 <executions>
                     <execution>
                         <goals>
diff --git a/inlong-tubemq/tubemq-server/pom.xml 
b/inlong-tubemq/tubemq-server/pom.xml
index 6c463a758e..0d2136e0cd 100644
--- a/inlong-tubemq/tubemq-server/pom.xml
+++ b/inlong-tubemq/tubemq-server/pom.xml
@@ -237,6 +237,7 @@
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>exec-maven-plugin</artifactId>
+                <version>${exec.maven.version}</version>
                 <executions>
                     <execution>
                         <id>version</id>


Reply via email to