This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new a0b69d6f58 [INLONG-9087][Sort] TubeMQ Connector use latest offset mode (#9088) a0b69d6f58 is described below commit a0b69d6f589ad72fcb31e26ffeec070da0b0c863 Author: vernedeng <verned...@apache.org> AuthorDate: Mon Oct 23 18:32:42 2023 +0800 [INLONG-9087][Sort] TubeMQ Connector use latest offset mode (#9088) --- .../main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java | 2 +- .../main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index e3d2674e8a..a4e39946c0 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -211,7 +211,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> public void open(Configuration parameters) throws Exception { ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, consumerGroup); consumerConfig.setConsumePosition(consumeFromMax - ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS + ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET : ConsumePosition.CONSUMER_FROM_FIRST_OFFSET); consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis()); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index 0180f90e02..5d7e8b27fa 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -209,7 +209,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> public void open(Configuration parameters) throws Exception { ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, consumerGroup); consumerConfig.setConsumePosition(consumeFromMax - ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS + ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET : ConsumePosition.CONSUMER_FROM_FIRST_OFFSET); consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());