This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0e60e726aa [INLONG-9077][Sort] Fix TubeMQ connector fail to subscribe streamId (#9078) 0e60e726aa is described below commit 0e60e726aa2531f63f6314e6c0430157859bc1db Author: vernedeng <verned...@apache.org> AuthorDate: Fri Oct 20 17:29:28 2023 +0800 [INLONG-9077][Sort] Fix TubeMQ connector fail to subscribe streamId (#9078) --- inlong-distribution/pom.xml | 2 +- .../pojo/sort/node/provider/TubeMqProvider.java | 4 ++-- .../manager/pojo/source/tubemq/TubeMQSource.java | 6 +++--- .../source/tubemq/TubeMQSourceOperator.java | 2 +- .../sort/protocol/constant/TubeMQConstant.java | 6 +++--- .../protocol/node/extract/TubeMQExtractNode.java | 24 +++++++++++----------- inlong-sort/sort-core/pom.xml | 6 ++++++ .../tubemq/table/TubeMQDynamicTableFactory.java | 8 ++++---- .../inlong/sort/tubemq/table/TubeMQOptions.java | 19 +++++++++++------ .../sort-connectors/pulsar/pom.xml | 2 ++ .../inlong/sort/tubemq/FlinkTubeMQConsumer.java | 14 ++++++------- .../inlong/sort/tubemq/FlinkTubeMQProducer.java | 10 ++++----- .../tubemq/table/TubeMQDynamicTableFactory.java | 4 ++-- .../inlong/sort/tubemq/table/TubeMQOptions.java | 8 ++++---- .../inlong/sort/tubemq/table/TubeMQTableSink.java | 14 ++++++------- .../sort/tubemq/table/TubeMQTableSource.java | 18 ++++++++-------- inlong-tubemq/tubemq-client/pom.xml | 2 ++ inlong-tubemq/tubemq-server/pom.xml | 1 + 18 files changed, 84 insertions(+), 66 deletions(-) diff --git a/inlong-distribution/pom.xml b/inlong-distribution/pom.xml index 5107b1a0fa..14b4a30804 100644 --- a/inlong-distribution/pom.xml +++ b/inlong-distribution/pom.xml @@ -73,7 +73,7 @@ <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> - <version>3.1.0</version> + <version>${exec.maven.version}</version> <configuration> <executable>${basedir}/script/backup_module_dependencys.sh</executable> </configuration> diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java index 64b0c0a71f..daaf5a94fb 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java @@ -53,9 +53,9 @@ public class TubeMqProvider implements ExtractNodeProvider { source.getMasterRpc(), source.getTopic(), source.getSerializationType(), - source.getGroupId(), + source.getConsumeGroup(), source.getSessionKey(), - source.getTid(), + source.getStreamId(), source.getInnerFormat()); } } \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java index 0a0e49127f..a295b0329e 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java @@ -52,7 +52,7 @@ public class TubeMQSource extends StreamSource { private String topic; @ApiModelProperty("Group of the TubeMQ") - private String groupId; + private String consumeGroup; @ApiModelProperty("Session key of the TubeMQ") private String sessionKey; @@ -61,10 +61,10 @@ public class TubeMQSource extends StreamSource { private String innerFormat; /** - * The TubeMQ consumers use this tid set to filter records reading from server. + * The TubeMQ consumers use this streamId set to filter records reading from server. */ @ApiModelProperty("Tid of the TubeMQ") - private TreeSet<String> tid; + private TreeSet<String> streamId; public TubeMQSource() { this.setSourceType(SourceType.TUBEMQ); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java index 7fc56539d9..df7af84b09 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java @@ -107,7 +107,7 @@ public class TubeMQSourceOperator extends AbstractSourceOperator { String streamId = streamInfo.getInlongStreamId(); tubeMQSource.setSourceName(streamId); tubeMQSource.setTopic(groupInfo.getMqResource()); - tubeMQSource.setGroupId(streamId); + tubeMQSource.setConsumeGroup(streamId); tubeMQSource.setMasterRpc(masterRpc); tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError()); diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java index 871e3eba41..eee84f1186 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java @@ -24,7 +24,7 @@ public class TubeMQConstant { public static final String TOPIC = "topic"; - public static final String GROUP_ID = "group.id"; + public static final String CONSUME_GROUP = "consume.group"; public static final String CONNECTOR = "connector"; @@ -38,9 +38,9 @@ public class TubeMQConstant { public static final String SESSION_KEY = "session.key"; /** - * The tubemq consumers use this tid set to filter records reading from server. + * The tubemq consumers use this streamId set to filter records reading from server. */ - public static final String TID = "tid"; + public static final String STREAMID = "stream.id"; public static final String CONSUMER_STARTUP_MODE = "consumer.startup.mode"; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java index bbcdbbecd5..4c55adb3de 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java @@ -61,17 +61,17 @@ public class TubeMQExtractNode extends ExtractNode implements Serializable { private String format; @Nonnull - @JsonProperty("groupId") - private String groupId; + @JsonProperty("consumeGroup") + private String consumeGroup; @JsonProperty("sessionKey") private String sessionKey; /** - * The tubemq consumers use this tid set to filter records reading from server. + * The tubemq consumers use this streamId set to filter records reading from server. */ - @JsonProperty("tid") - private TreeSet<String> tid; + @JsonProperty("streamId") + private TreeSet<String> streamId; @JsonProperty("inlong-msg.inner.format") private String innerFormat; @@ -86,18 +86,18 @@ public class TubeMQExtractNode extends ExtractNode implements Serializable { @Nonnull @JsonProperty("masterRpc") String masterRpc, @Nonnull @JsonProperty("topic") String topic, @Nonnull @JsonProperty("format") String format, - @Nonnull @JsonProperty("groupId") String groupId, + @Nonnull @JsonProperty("consumeGroup") String consumeGroup, @JsonProperty("sessionKey") String sessionKey, - @JsonProperty("tid") TreeSet<String> tid, + @JsonProperty("streamId") TreeSet<String> streamId, @JsonProperty("inlong-msg.inner.format") String innerFormat) { super(id, name, fields, waterMarkField, properties); this.masterRpc = Preconditions.checkNotNull(masterRpc, "TubeMQ masterRpc is null"); this.topic = Preconditions.checkNotNull(topic, "TubeMQ topic is null"); this.format = Preconditions.checkNotNull(format, "Format is null"); - this.groupId = Preconditions.checkNotNull(groupId, "Group id is null"); + this.consumeGroup = Preconditions.checkNotNull(consumeGroup, "Group id is null"); this.sessionKey = sessionKey; + this.streamId = streamId; this.innerFormat = innerFormat; - this.tid = tid; } @Override @@ -106,15 +106,15 @@ public class TubeMQExtractNode extends ExtractNode implements Serializable { map.put(TubeMQConstant.CONNECTOR, TubeMQConstant.TUBEMQ); map.put(TubeMQConstant.TOPIC, topic); map.put(TubeMQConstant.MASTER_RPC, masterRpc); - map.put(TubeMQConstant.GROUP_ID, groupId); + map.put(TubeMQConstant.CONSUME_GROUP, consumeGroup); map.put(TubeMQConstant.FORMAT, format); map.put(TubeMQConstant.SESSION_KEY, sessionKey); if (format.startsWith(INLONG_MSG)) { map.put(TubeMQConstant.INNER_FORMAT, innerFormat); } - if (null != tid && !tid.isEmpty()) { - map.put(TubeMQConstant.TID, StringUtils.concatCsv(tid.toArray(new String[0]), + if (null != streamId && !streamId.isEmpty()) { + map.put(TubeMQConstant.STREAMID, StringUtils.concatCsv(streamId.toArray(new String[0]), ',', null, null)); } diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml index 50b41d6833..c463785605 100644 --- a/inlong-sort/sort-core/pom.xml +++ b/inlong-sort/sort-core/pom.xml @@ -245,6 +245,12 @@ <profile> <id>v1.15</id> <dependencies> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-tubemq-v1.15</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.inlong</groupId> <artifactId>sort-connector-postgres-cdc-v1.15</artifactId> diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java index 7101a475a7..8aad010fba 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java @@ -44,11 +44,11 @@ import java.util.TreeSet; import static org.apache.flink.table.factories.FactoryUtil.FORMAT; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX; -import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.GROUP_ID; +import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY; -import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TID; +import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.STREAMID; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC_PATTERN; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQProperties; @@ -169,8 +169,8 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(FORMAT); options.add(TOPIC); - options.add(GROUP_ID); - options.add(TID); + options.add(CONSUME_GROUP); + options.add(STREAMID); options.add(SESSION_KEY); options.add(BOOTSTRAP_FROM_MAX); options.add(TOPIC_PATTERN); diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java index ba069094bb..357da2dd09 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java @@ -87,6 +87,13 @@ public class TubeMQOptions { // TubeMQ specific options // -------------------------------------------------------------------------------------------- + public static final ConfigOption<String> INNER_FORMAT = + ConfigOptions.key("inlong-msg.inner.format") + .stringType() + .noDefaultValue() + .withDescription( + "Inner format"); + public static final ConfigOption<String> TOPIC = ConfigOptions.key("topic") .stringType() @@ -109,8 +116,8 @@ public class TubeMQOptions { .noDefaultValue() .withDescription("Required TubeMQ master connection string"); - public static final ConfigOption<String> GROUP_ID = - ConfigOptions.key("group.id") + public static final ConfigOption<String> CONSUME_GROUP = + ConfigOptions.key("consume.group") .stringType() .noDefaultValue() .withDescription( @@ -140,8 +147,8 @@ public class TubeMQOptions { .defaultValue("default_session_key") .withDescription("The session key for this consumer group at startup."); - public static final ConfigOption<List<String>> TID = - ConfigOptions.key("topic.tid") + public static final ConfigOption<List<String>> STREAMID = + ConfigOptions.key("stream.id") .stringType() .asList() .noDefaultValue() @@ -385,7 +392,7 @@ public class TubeMQOptions { public static TreeSet<String> getTiSet(ReadableConfig tableOptions) { TreeSet<String> set = new TreeSet<>(); - tableOptions.getOptional(TID).ifPresent(new Consumer<List<String>>() { + tableOptions.getOptional(STREAMID).ifPresent(new Consumer<List<String>>() { @Override public void accept(List<String> strings) { @@ -396,7 +403,7 @@ public class TubeMQOptions { } public static String getConsumerGroup(ReadableConfig tableOptions) { - return tableOptions.getOptional(GROUP_ID).orElse(null); + return tableOptions.getOptional(CONSUME_GROUP).orElse(null); } public static String getSessionKey(ReadableConfig tableOptions) { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml index e01c3278fa..501d39c637 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml @@ -130,6 +130,7 @@ <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> + <version>${protobuf.maven.plugin.version}</version> <extensions>true</extensions> <configuration> <!-- Currently Flink azure test pipeline would first pre-compile and then upload the compiled @@ -157,6 +158,7 @@ <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> + <version>${build.helper.maven.version}</version> <executions> <execution> <id>add-test-source</id> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index b890561681..0180f90e02 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -80,9 +80,9 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> private final String topic; /** - * The tubemq consumers use this tid set to filter records reading from server. + * The tubemq consumers use this streamId set to filter records reading from server. */ - private final TreeSet<String> tidSet; + private final TreeSet<String> streamIdSet; /** * The consumer group name. @@ -147,7 +147,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> * * @param masterAddress the master address of TubeMQ * @param topic the topic name - * @param tidSet the topic's filter condition items + * @param streamIdSet the topic's filter condition items * @param consumerGroup the consumer group name * @param deserializationSchema the deserialize schema * @param configuration the configure @@ -156,7 +156,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> public FlinkTubeMQConsumer( String masterAddress, String topic, - TreeSet<String> tidSet, + TreeSet<String> streamIdSet, String consumerGroup, DeserializationSchema<T> deserializationSchema, Configuration configuration, @@ -164,14 +164,14 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> Boolean innerFormat) { checkNotNull(masterAddress, "The master address must not be null."); checkNotNull(topic, "The topic must not be null."); - checkNotNull(tidSet, "The tid set must not be null."); + checkNotNull(streamIdSet, "The streamId set must not be null."); checkNotNull(consumerGroup, "The consumer group must not be null."); checkNotNull(deserializationSchema, "The deserialization schema must not be null."); checkNotNull(configuration, "The configuration must not be null."); this.masterAddress = masterAddress; this.topic = topic; - this.tidSet = tidSet; + this.streamIdSet = streamIdSet; this.consumerGroup = consumerGroup; this.deserializationSchema = deserializationSchema; this.sessionKey = sessionKey; @@ -217,7 +217,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks(); messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig); - messagePullConsumer.subscribe(topic, tidSet); + messagePullConsumer.subscribe(topic, streamIdSet); messagePullConsumer.completeSubscribe(sessionKey, numTasks, true, currentOffsets); running = true; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java index fb2f624961..3f04594b67 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java @@ -60,9 +60,9 @@ public class FlinkTubeMQProducer<T> extends RichSinkFunction<T> implements Check private final String topic; /** - * The tubemq consumers use this tid set to filter records reading from server. + * The tubemq consumers use this streamId set to filter records reading from server. */ - private final TreeSet<String> tidSet; + private final TreeSet<String> streamIdSet; /** * The serializer for the records sent to tube. */ @@ -86,12 +86,12 @@ public class FlinkTubeMQProducer<T> extends RichSinkFunction<T> implements Check public FlinkTubeMQProducer(String topic, String masterAddress, SerializationSchema<T> serializationSchema, - TreeSet<String> tidSet, + TreeSet<String> streamIdSet, Configuration configuration) { checkNotNull(topic, "The topic must not be null."); checkNotNull(masterAddress, "The master address must not be null."); checkNotNull(serializationSchema, "The serialization schema must not be null."); - checkNotNull(tidSet, "The tid set must not be null."); + checkNotNull(streamIdSet, "The streamId set must not be null."); checkNotNull(configuration, "The configuration must not be null."); int max_retries = configuration.getInteger(TubeMQOptions.MAX_RETRIES); @@ -100,7 +100,7 @@ public class FlinkTubeMQProducer<T> extends RichSinkFunction<T> implements Check this.topic = topic; this.masterAddress = masterAddress; this.serializationSchema = serializationSchema; - this.tidSet = tidSet; + this.streamIdSet = streamIdSet; this.maxRetries = max_retries; } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java index f6130b5288..81b6709418 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java @@ -47,7 +47,7 @@ import java.util.TreeSet; import static org.apache.flink.table.factories.FactoryUtil.FORMAT; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX; -import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.GROUP_NAME; +import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY; @@ -220,7 +220,7 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn final Set<ConfigOption<?>> options = new HashSet<>(); options.add(FORMAT); options.add(TOPIC); - options.add(GROUP_NAME); + options.add(CONSUME_GROUP); options.add(STREAMID); options.add(SESSION_KEY); options.add(BOOTSTRAP_FROM_MAX); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java index 76f85f0563..0b4ea93978 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java @@ -106,8 +106,8 @@ public class TubeMQOptions { .noDefaultValue() .withDescription("Required TubeMQ master connection string"); - public static final ConfigOption<String> GROUP_NAME = - ConfigOptions.key("group.name") + public static final ConfigOption<String> CONSUME_GROUP = + ConfigOptions.key("consume.group") .stringType() .noDefaultValue() .withDescription( @@ -138,7 +138,7 @@ public class TubeMQOptions { .withDescription("The session key for this consumer group at startup."); public static final ConfigOption<List<String>> STREAMID = - ConfigOptions.key("topic.streamId") + ConfigOptions.key("stream.id") .stringType() .asList() .noDefaultValue() @@ -275,7 +275,7 @@ public class TubeMQOptions { } public static String getConsumerGroup(ReadableConfig tableOptions) { - return tableOptions.getOptional(GROUP_NAME).orElse(null); + return tableOptions.getOptional(CONSUME_GROUP).orElse(null); } public static String getSessionKey(ReadableConfig tableOptions) { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java index 5d2f8c2a4d..cf5109d82f 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java @@ -50,9 +50,9 @@ public class TubeMQTableSink implements DynamicTableSink { */ private final String masterAddress; /** - * The TubeMQ tid filter collection. + * The TubeMQ streamId filter collection. */ - private final TreeSet<String> tidSet; + private final TreeSet<String> streamIdSet; /** * The parameters collection for tubemq producer. */ @@ -63,20 +63,20 @@ public class TubeMQTableSink implements DynamicTableSink { EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat, String topic, String masterAddress, - TreeSet<String> tidSet, + TreeSet<String> streamIdSet, Configuration configuration) { Preconditions.checkNotNull(valueEncodingFormat, "The serialization schema must not be null."); Preconditions.checkNotNull(physicalDataType, "Physical data type must not be null."); Preconditions.checkNotNull(topic, "Topic must not be null."); Preconditions.checkNotNull(masterAddress, "Master address must not be null."); Preconditions.checkNotNull(configuration, "The configuration must not be null."); - Preconditions.checkNotNull(tidSet, "The tid set must not be null."); + Preconditions.checkNotNull(streamIdSet, "The streamId set must not be null."); this.valueEncodingFormat = valueEncodingFormat; this.physicalDataType = physicalDataType; this.topic = topic; this.masterAddress = masterAddress; - this.tidSet = tidSet; + this.streamIdSet = streamIdSet; this.configuration = configuration; } @@ -102,7 +102,7 @@ public class TubeMQTableSink implements DynamicTableSink { SerializationSchema<RowData> serializationSchema, Configuration configuration) { final FlinkTubeMQProducer<RowData> tubeMQProducer = - new FlinkTubeMQProducer(topic, masterAddress, serializationSchema, tidSet, configuration); + new FlinkTubeMQProducer(topic, masterAddress, serializationSchema, streamIdSet, configuration); return tubeMQProducer; } @@ -120,7 +120,7 @@ public class TubeMQTableSink implements DynamicTableSink { valueEncodingFormat, topic, masterAddress, - tidSet, + streamIdSet, configuration); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java index 2dc21ca2e8..e79685ff70 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java @@ -84,9 +84,9 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada */ private final String topic; /** - * The TubeMQ tid filter collection. + * The TubeMQ streamId filter collection. */ - private final TreeSet<String> tidSet; + private final TreeSet<String> streamIdSet; /** * The TubeMQ consumer group name. */ @@ -129,7 +129,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada public TubeMQTableSource(DataType physicalDataType, DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, String masterAddress, String topic, - TreeSet<String> tidSet, String consumerGroup, String sessionKey, + TreeSet<String> streamIdSet, String consumerGroup, String sessionKey, Configuration configuration, @Nullable WatermarkStrategy<RowData> watermarkStrategy, Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean innerFormat) { @@ -137,7 +137,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada Preconditions.checkNotNull(valueDecodingFormat, "The deserialization schema must not be null."); Preconditions.checkNotNull(masterAddress, "The master address must not be null."); Preconditions.checkNotNull(topic, "The topic must not be null."); - Preconditions.checkNotNull(tidSet, "The tid set must not be null."); + Preconditions.checkNotNull(streamIdSet, "The streamId set must not be null."); Preconditions.checkNotNull(consumerGroup, "The consumer group must not be null."); Preconditions.checkNotNull(configuration, "The configuration must not be null."); @@ -147,7 +147,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada this.valueDecodingFormat = valueDecodingFormat; this.masterAddress = masterAddress; this.topic = topic; - this.tidSet = tidSet; + this.streamIdSet = streamIdSet; this.consumerGroup = consumerGroup; this.sessionKey = sessionKey; this.configuration = configuration; @@ -182,7 +182,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada public DynamicTableSource copy() { return new TubeMQTableSource( physicalDataType, valueDecodingFormat, masterAddress, - topic, tidSet, consumerGroup, sessionKey, configuration, + topic, streamIdSet, consumerGroup, sessionKey, configuration, watermarkStrategy, proctimeAttribute, ignoreErrors, innerFormat); } @@ -247,7 +247,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada && Objects.equals(valueDecodingFormat, that.valueDecodingFormat) && Objects.equals(masterAddress, that.masterAddress) && Objects.equals(topic, that.topic) - && Objects.equals(String.valueOf(tidSet), String.valueOf(that.tidSet)) + && Objects.equals(String.valueOf(streamIdSet), String.valueOf(that.streamIdSet)) && Objects.equals(consumerGroup, that.consumerGroup) && Objects.equals(proctimeAttribute, that.proctimeAttribute) && Objects.equals(watermarkStrategy, that.watermarkStrategy); @@ -260,7 +260,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada valueDecodingFormat, masterAddress, topic, - tidSet, + streamIdSet, consumerGroup, configuration, watermarkStrategy, @@ -302,7 +302,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada final DeserializationSchema<RowData> tubeMQDeserializer = new DynamicTubeMQDeserializationSchema( deserialization, metadataConverters, producedTypeInfo, ignoreErrors); - final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new FlinkTubeMQConsumer(masterAddress, topic, tidSet, + final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new FlinkTubeMQConsumer(masterAddress, topic, streamIdSet, consumerGroup, tubeMQDeserializer, configuration, sessionKey, innerFormat); return tubeMQConsumer; } diff --git a/inlong-tubemq/tubemq-client/pom.xml b/inlong-tubemq/tubemq-client/pom.xml index b6055071d0..cff6081658 100644 --- a/inlong-tubemq/tubemq-client/pom.xml +++ b/inlong-tubemq/tubemq-client/pom.xml @@ -94,6 +94,7 @@ <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> + <version>${exec.maven.version}</version> <executions> <execution> <id>version</id> @@ -114,6 +115,7 @@ <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> + <version>${build.helper.maven.version}</version> <executions> <execution> <goals> diff --git a/inlong-tubemq/tubemq-server/pom.xml b/inlong-tubemq/tubemq-server/pom.xml index 6c463a758e..0d2136e0cd 100644 --- a/inlong-tubemq/tubemq-server/pom.xml +++ b/inlong-tubemq/tubemq-server/pom.xml @@ -237,6 +237,7 @@ <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> + <version>${exec.maven.version}</version> <executions> <execution> <id>version</id>