This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 1576ae2  [ISSUE #103] Fix seek offset and deserialize bug in newSource 
(#104)
1576ae2 is described below

commit 1576ae2d4d51604a464df0fb21074f67c9e50c0f
Author: hejunjie <844028...@qq.com>
AuthorDate: Mon Dec 4 14:34:52 2023 +0800

    [ISSUE #103] Fix seek offset and deserialize bug in newSource (#104)
---
 pom.xml                                                     |  1 +
 .../flink/connector/rocketmq/source/InnerConsumerImpl.java  | 13 ++++++++++---
 .../enumerator/RocketMQSourceEnumStateSerializer.java       |  5 ++---
 .../source/enumerator/RocketMQSourceEnumerator.java         | 11 ++++++-----
 .../rocketmq/source/reader/RocketMQSplitReader.java         |  3 ---
 5 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/pom.xml b/pom.xml
index 67f322d..2eddd1a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
         <junit.version>4.13.2</junit.version>
         <junit-jupiter.version>5.9.2</junit-jupiter.version>
         <powermock.version>1.7.4</powermock.version>
+        <jaxb-api.version>2.3.1</jaxb-api.version>
     </properties>
 
     <dependencies>
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
index edc438b..317031f 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.connector.rocketmq.source;
 
+import com.alibaba.fastjson.JSON;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
@@ -25,8 +26,6 @@ import 
org.apache.flink.connector.rocketmq.source.reader.MessageViewExt;
 import org.apache.flink.connector.rocketmq.source.util.UtilAll;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.StringUtils;
-
-import com.alibaba.fastjson.JSON;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
@@ -240,7 +239,15 @@ public class InnerConsumerImpl implements InnerConsumer {
                         long offset =
                                 consumer.getOffsetStore()
                                         .readOffset(messageQueue, 
ReadOffsetType.READ_FROM_STORE);
-                        LOG.error(
+
+                        if (offset == -1) {
+                            offset = adminExt.minOffset(messageQueue);
+                            LOG.info(
+                                    "Consumer seek committed offset from 
remote, offset=-1,mq={},use minOffset={}",
+                                    UtilAll.getQueueDescription(messageQueue),
+                                    offset);
+                        }
+                        LOG.info(
                                 "Consumer seek committed offset from remote, 
mq={}, offset={}",
                                 UtilAll.getQueueDescription(messageQueue),
                                 offset);
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
index 7589ba4..805df1b 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.connector.rocketmq.source.enumerator;
 
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
 import com.alibaba.fastjson.JSON;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,7 +92,7 @@ public class RocketMQSourceEnumStateSerializer
                 String topic = in.readUTF();
                 int queueId = in.readInt();
 
-                MessageQueue queue = new MessageQueue(brokerName, topic, 
queueId);
+                MessageQueue queue = new MessageQueue(topic, brokerName, 
queueId);
                 result.add(queue);
             }
 
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
index 6103444..77c0c33 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.rocketmq.source.enumerator;
 
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.Sets;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -34,15 +36,11 @@ import 
org.apache.flink.connector.rocketmq.source.enumerator.allocate.AllocateSt
 import 
org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
 import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
 import org.apache.flink.util.FlinkRuntimeException;
-
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.Sets;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -166,6 +164,9 @@ public class RocketMQSourceEnumerator
      */
     @Override
     public void addSplitsBack(List<RocketMQSourceSplit> splits, int subtaskId) 
{
+        SourceSplitChangeResult sourceSplitChangeResult =
+                new SourceSplitChangeResult(new HashSet<>(splits));
+        this.calculateSplitAssignment(sourceSplitChangeResult);
         // If the failed subtask has already restarted, we need to assign 
splits to it
         if (context.registeredReaders().containsKey(subtaskId)) {
             sendSplitChangesToRemote(Collections.singleton(subtaskId));
@@ -321,7 +322,7 @@ public class RocketMQSourceEnumerator
             }
 
             final Set<RocketMQSourceSplit> pendingAssignmentForReader =
-                    this.pendingSplitAssignmentMap.get(pendingReader);
+                    this.pendingSplitAssignmentMap.remove(pendingReader);
 
             // Put pending assignment into incremental assignment
             if (pendingAssignmentForReader != null && 
!pendingAssignmentForReader.isEmpty()) {
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
index b0bca4a..3a07966 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
@@ -36,13 +36,11 @@ import 
org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
 import org.apache.flink.connector.rocketmq.source.util.UtilAll;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
-
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -190,7 +188,6 @@ public class RocketMQSplitReader<T> implements 
SplitReader<MessageView, RocketMQ
     public void wakeUp() {
         LOG.debug("Wake up the split reader in case the fetcher thread is 
blocking in fetch().");
         wakeup = true;
-        this.consumer.wakeup();
     }
 
     @Override

Reply via email to