This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0278f0c143 [INLONG-9128][Sort] Fix the failed to init TubeMQ source 
with InlongMsg type message (#9129)
0278f0c143 is described below

commit 0278f0c1435e6a3d9266ac7a0dd8ebd3fa5cf74d
Author: vernedeng <verned...@apache.org>
AuthorDate: Fri Oct 27 11:02:30 2023 +0800

    [INLONG-9128][Sort] Fix the failed to init TubeMQ source with InlongMsg 
type message (#9129)
---
 .../java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java   | 4 +++-
 .../inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java       | 8 +++-----
 .../java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java   | 4 +++-
 .../inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java       | 4 ++--
 4 files changed, 11 insertions(+), 9 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index a4e39946c0..96c27d2b9b 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -209,6 +209,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
 
     @Override
     public void open(Configuration parameters) throws Exception {
+
         ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, 
consumerGroup);
         consumerConfig.setConsumePosition(consumeFromMax
                 ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
@@ -220,7 +221,8 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
         messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         messagePullConsumer = 
messageSessionFactory.createPullConsumer(consumerConfig);
         messagePullConsumer.subscribe(topic, tidSet);
-        messagePullConsumer.completeSubscribe(sessionKey, numTasks, true, 
currentOffsets);
+        String jobId = getRuntimeContext().getJobId().toString();
+        messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), 
numTasks, true, currentOffsets);
 
         running = true;
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 8aad010fba..43fb3e198e 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -35,9 +35,7 @@ import 
org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
 
-import java.util.Arrays;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
@@ -60,7 +58,7 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory {
 
     public static final String IDENTIFIER = "tubemq-inlong";
 
-    public static final List<String> INNERFORMATTYPE = 
Arrays.asList("inlong-msg");
+    public static final String INNERFORMATTYPE = "inlong-msg";
 
     public static boolean innerFormat = false;
 
@@ -76,7 +74,7 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory {
                 && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
             Configuration options = 
Configuration.fromMap(catalogTable.getOptions());
             String formatName = 
options.getOptional(FORMAT).orElse(options.get(FORMAT));
-            innerFormat = INNERFORMATTYPE.contains(formatName);
+            innerFormat = INNERFORMATTYPE.equals(formatName);
             throw new ValidationException(String.format(
                     "The TubeMQ table '%s' with '%s' format doesn't support 
defining PRIMARY KEY constraint"
                             + " on the table, because it can't guarantee the 
semantic of primary key.",
@@ -109,7 +107,7 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory {
         final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat = getValueDecodingFormat(helper);
 
         // validate all options
-        helper.validate();
+        helper.validateExcept(INNERFORMATTYPE);
 
         validatePKConstraints(context.getObjectIdentifier(), 
context.getCatalogTable(), valueDecodingFormat);
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index 5d7e8b27fa..cafa653f65 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -207,6 +207,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
 
     @Override
     public void open(Configuration parameters) throws Exception {
+
         ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, 
consumerGroup);
         consumerConfig.setConsumePosition(consumeFromMax
                 ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
@@ -218,7 +219,8 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
         messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         messagePullConsumer = 
messageSessionFactory.createPullConsumer(consumerConfig);
         messagePullConsumer.subscribe(topic, streamIdSet);
-        messagePullConsumer.completeSubscribe(sessionKey, numTasks, true, 
currentOffsets);
+        String jobId = getRuntimeContext().getJobId().toString();
+        messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), 
numTasks, true, currentOffsets);
 
         running = true;
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 81b6709418..0472353037 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -117,7 +117,7 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
         final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat = getValueDecodingFormat(helper);
 
         // validate all options
-        helper.validate();
+        helper.validateExcept(INNERFORMATTYPE);
 
         validatePKConstraints(context.getObjectIdentifier(), 
context.getCatalogTable(), valueDecodingFormat);
         innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT));
@@ -146,7 +146,7 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
         final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat 
= getValueEncodingFormat(helper);
 
         // validate all options
-        helper.validate();
+        helper.validateExcept(INNERFORMATTYPE);
 
         validatePKConstraints(context.getObjectIdentifier(), 
context.getCatalogTable(), valueEncodingFormat);
 

Reply via email to