This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new bd8861c  fix bug:seek offset won't work due to wrong map key type 
(#1184)
bd8861c is described below

commit bd8861cb04b456693af7dff9c55c8c5451cad12b
Author: muyun.cyt <921148...@qq.com>
AuthorDate: Fri Dec 20 11:04:31 2024 +0800

    fix bug:seek offset won't work due to wrong map key type (#1184)
    
    Co-authored-by: muyun.cyt <muyun....@antgroup.com>
---
 consumer/pull_consumer.go | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 3258779..9592061 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -208,12 +208,17 @@ func (pc *defaultPullConsumer) nextPullOffset(mq 
*primitive.MessageQueue, origin
        if pc.SubType != Assign {
                return originOffset
        }
-       value, exist := pc.mq2seekOffset.LoadAndDelete(mq)
+       value, exist := pc.mq2seekOffset.LoadAndDelete(*mq)
        if !exist {
                return originOffset
        } else {
                nextOffset := value.(int64)
                _ = pc.updateOffset(mq, nextOffset)
+               rlog.Info("pull consumer assign new offset", 
map[string]interface{}{
+                       "group":  pc.GroupName,
+                       "mq":     mq,
+                       "offset": nextOffset,
+               })
                return nextOffset
        }
 }
@@ -711,7 +716,7 @@ func (pc *defaultPullConsumer) ResetOffset(topic string, 
table map[primitive.Mes
 }
 
 func (pc *defaultPullConsumer) SeekOffset(mq *primitive.MessageQueue, offset 
int64) {
-       pc.mq2seekOffset.Store(mq, offset)
+       pc.mq2seekOffset.Store(*mq, offset)
        rlog.Info("pull consumer seek offset", map[string]interface{}{
                "mq":     mq,
                "offset": offset,
@@ -881,6 +886,8 @@ func (pc *defaultPullConsumer) pullMessage(request 
*PullRequest) {
                        pullRequest.SysFlag = 
clearCommitOffsetFlag(pullRequest.SysFlag)
                }
 
+               rlog.Debug(fmt.Sprintf("defaultPullConsumer pull message from 
broker: %s, request: %+v", brokerResult.BrokerAddr, pullRequest), nil)
+
                result, err := pc.client.PullMessage(context.Background(), 
brokerResult.BrokerAddr, pullRequest)
                if err != nil {
                        rlog.Warning("defaultPullConsumer pull message from 
broker error", map[string]interface{}{

Reply via email to