This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0278f0c143 [INLONG-9128][Sort] Fix the failed to init TubeMQ source with InlongMsg type message (#9129) 0278f0c143 is described below commit 0278f0c1435e6a3d9266ac7a0dd8ebd3fa5cf74d Author: vernedeng <verned...@apache.org> AuthorDate: Fri Oct 27 11:02:30 2023 +0800 [INLONG-9128][Sort] Fix the failed to init TubeMQ source with InlongMsg type message (#9129) --- .../java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java | 4 +++- .../inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java | 8 +++----- .../java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java | 4 +++- .../inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java | 4 ++-- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index a4e39946c0..96c27d2b9b 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -209,6 +209,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> @Override public void open(Configuration parameters) throws Exception { + ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, consumerGroup); consumerConfig.setConsumePosition(consumeFromMax ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET @@ -220,7 +221,8 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig); messagePullConsumer.subscribe(topic, tidSet); - messagePullConsumer.completeSubscribe(sessionKey, numTasks, true, currentOffsets); + String jobId = getRuntimeContext().getJobId().toString(); + messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), numTasks, true, currentOffsets); running = true; } diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java index 8aad010fba..43fb3e198e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java @@ -35,9 +35,7 @@ import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; -import java.util.Arrays; import java.util.HashSet; -import java.util.List; import java.util.Optional; import java.util.Set; import java.util.TreeSet; @@ -60,7 +58,7 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory { public static final String IDENTIFIER = "tubemq-inlong"; - public static final List<String> INNERFORMATTYPE = Arrays.asList("inlong-msg"); + public static final String INNERFORMATTYPE = "inlong-msg"; public static boolean innerFormat = false; @@ -76,7 +74,7 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory { && format.getChangelogMode().containsOnly(RowKind.INSERT)) { Configuration options = Configuration.fromMap(catalogTable.getOptions()); String formatName = options.getOptional(FORMAT).orElse(options.get(FORMAT)); - innerFormat = INNERFORMATTYPE.contains(formatName); + innerFormat = INNERFORMATTYPE.equals(formatName); throw new ValidationException(String.format( "The TubeMQ table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" + " on the table, because it can't guarantee the semantic of primary key.", @@ -109,7 +107,7 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory { final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = getValueDecodingFormat(helper); // validate all options - helper.validate(); + helper.validateExcept(INNERFORMATTYPE); validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index 5d7e8b27fa..cafa653f65 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -207,6 +207,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> @Override public void open(Configuration parameters) throws Exception { + ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, consumerGroup); consumerConfig.setConsumePosition(consumeFromMax ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET @@ -218,7 +219,8 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig); messagePullConsumer.subscribe(topic, streamIdSet); - messagePullConsumer.completeSubscribe(sessionKey, numTasks, true, currentOffsets); + String jobId = getRuntimeContext().getJobId().toString(); + messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), numTasks, true, currentOffsets); running = true; } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java index 81b6709418..0472353037 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java @@ -117,7 +117,7 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = getValueDecodingFormat(helper); // validate all options - helper.validate(); + helper.validateExcept(INNERFORMATTYPE); validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT)); @@ -146,7 +146,7 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = getValueEncodingFormat(helper); // validate all options - helper.validate(); + helper.validateExcept(INNERFORMATTYPE); validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat);