This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push: new 1576ae2 [ISSUE #103] Fix seek offset and deserialize bug in newSource (#104) 1576ae2 is described below commit 1576ae2d4d51604a464df0fb21074f67c9e50c0f Author: hejunjie <844028...@qq.com> AuthorDate: Mon Dec 4 14:34:52 2023 +0800 [ISSUE #103] Fix seek offset and deserialize bug in newSource (#104) --- pom.xml | 1 + .../flink/connector/rocketmq/source/InnerConsumerImpl.java | 13 ++++++++++--- .../enumerator/RocketMQSourceEnumStateSerializer.java | 5 ++--- .../source/enumerator/RocketMQSourceEnumerator.java | 11 ++++++----- .../rocketmq/source/reader/RocketMQSplitReader.java | 3 --- 5 files changed, 19 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index 67f322d..2eddd1a 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,7 @@ <junit.version>4.13.2</junit.version> <junit-jupiter.version>5.9.2</junit-jupiter.version> <powermock.version>1.7.4</powermock.version> + <jaxb-api.version>2.3.1</jaxb-api.version> </properties> <dependencies> diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java index edc438b..317031f 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java @@ -17,6 +17,7 @@ package org.apache.flink.connector.rocketmq.source; +import com.alibaba.fastjson.JSON; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector; @@ -25,8 +26,6 @@ import org.apache.flink.connector.rocketmq.source.reader.MessageViewExt; import org.apache.flink.connector.rocketmq.source.util.UtilAll; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.StringUtils; - -import com.alibaba.fastjson.JSON; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; @@ -240,7 +239,15 @@ public class InnerConsumerImpl implements InnerConsumer { long offset = consumer.getOffsetStore() .readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE); - LOG.error( + + if (offset == -1) { + offset = adminExt.minOffset(messageQueue); + LOG.info( + "Consumer seek committed offset from remote, offset=-1,mq={},use minOffset={}", + UtilAll.getQueueDescription(messageQueue), + offset); + } + LOG.info( "Consumer seek committed offset from remote, mq={}, offset={}", UtilAll.getQueueDescription(messageQueue), offset); diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java index 7589ba4..805df1b 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java @@ -18,9 +18,8 @@ package org.apache.flink.connector.rocketmq.source.enumerator; -import org.apache.flink.core.io.SimpleVersionedSerializer; - import com.alibaba.fastjson.JSON; +import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.rocketmq.common.message.MessageQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,7 +92,7 @@ public class RocketMQSourceEnumStateSerializer String topic = in.readUTF(); int queueId = in.readInt(); - MessageQueue queue = new MessageQueue(brokerName, topic, queueId); + MessageQueue queue = new MessageQueue(topic, brokerName, queueId); result.add(queue); } diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java index 6103444..77c0c33 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.rocketmq.source.enumerator; +import com.alibaba.fastjson.JSON; +import com.google.common.collect.Sets; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.source.Boundedness; @@ -34,15 +36,11 @@ import org.apache.flink.connector.rocketmq.source.enumerator.allocate.AllocateSt import org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector; import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit; import org.apache.flink.util.FlinkRuntimeException; - -import com.alibaba.fastjson.JSON; -import com.google.common.collect.Sets; import org.apache.rocketmq.common.message.MessageQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -166,6 +164,9 @@ public class RocketMQSourceEnumerator */ @Override public void addSplitsBack(List<RocketMQSourceSplit> splits, int subtaskId) { + SourceSplitChangeResult sourceSplitChangeResult = + new SourceSplitChangeResult(new HashSet<>(splits)); + this.calculateSplitAssignment(sourceSplitChangeResult); // If the failed subtask has already restarted, we need to assign splits to it if (context.registeredReaders().containsKey(subtaskId)) { sendSplitChangesToRemote(Collections.singleton(subtaskId)); @@ -321,7 +322,7 @@ public class RocketMQSourceEnumerator } final Set<RocketMQSourceSplit> pendingAssignmentForReader = - this.pendingSplitAssignmentMap.get(pendingReader); + this.pendingSplitAssignmentMap.remove(pendingReader); // Put pending assignment into incremental assignment if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) { diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java index b0bca4a..3a07966 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java @@ -36,13 +36,11 @@ import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit; import org.apache.flink.connector.rocketmq.source.util.UtilAll; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; - import org.apache.rocketmq.common.message.MessageQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; - import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -190,7 +188,6 @@ public class RocketMQSplitReader<T> implements SplitReader<MessageView, RocketMQ public void wakeUp() { LOG.debug("Wake up the split reader in case the fetcher thread is blocking in fetch()."); wakeup = true; - this.consumer.wakeup(); } @Override