This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a0b69d6f58 [INLONG-9087][Sort] TubeMQ Connector use latest offset mode 
(#9088)
a0b69d6f58 is described below

commit a0b69d6f589ad72fcb31e26ffeec070da0b0c863
Author: vernedeng <verned...@apache.org>
AuthorDate: Mon Oct 23 18:32:42 2023 +0800

    [INLONG-9087][Sort] TubeMQ Connector use latest offset mode (#9088)
---
 .../main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java    | 2 +-
 .../main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index e3d2674e8a..a4e39946c0 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -211,7 +211,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
     public void open(Configuration parameters) throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, 
consumerGroup);
         consumerConfig.setConsumePosition(consumeFromMax
-                ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS
+                ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
                 : ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
 
         
consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index 0180f90e02..5d7e8b27fa 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -209,7 +209,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
     public void open(Configuration parameters) throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, 
consumerGroup);
         consumerConfig.setConsumePosition(consumeFromMax
-                ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS
+                ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
                 : ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
 
         
consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());

Reply via email to