This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push: new bd8861c fix bug:seek offset won't work due to wrong map key type (#1184) bd8861c is described below commit bd8861cb04b456693af7dff9c55c8c5451cad12b Author: muyun.cyt <921148...@qq.com> AuthorDate: Fri Dec 20 11:04:31 2024 +0800 fix bug:seek offset won't work due to wrong map key type (#1184) Co-authored-by: muyun.cyt <muyun....@antgroup.com> --- consumer/pull_consumer.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index 3258779..9592061 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -208,12 +208,17 @@ func (pc *defaultPullConsumer) nextPullOffset(mq *primitive.MessageQueue, origin if pc.SubType != Assign { return originOffset } - value, exist := pc.mq2seekOffset.LoadAndDelete(mq) + value, exist := pc.mq2seekOffset.LoadAndDelete(*mq) if !exist { return originOffset } else { nextOffset := value.(int64) _ = pc.updateOffset(mq, nextOffset) + rlog.Info("pull consumer assign new offset", map[string]interface{}{ + "group": pc.GroupName, + "mq": mq, + "offset": nextOffset, + }) return nextOffset } } @@ -711,7 +716,7 @@ func (pc *defaultPullConsumer) ResetOffset(topic string, table map[primitive.Mes } func (pc *defaultPullConsumer) SeekOffset(mq *primitive.MessageQueue, offset int64) { - pc.mq2seekOffset.Store(mq, offset) + pc.mq2seekOffset.Store(*mq, offset) rlog.Info("pull consumer seek offset", map[string]interface{}{ "mq": mq, "offset": offset, @@ -881,6 +886,8 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) { pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag) } + rlog.Debug(fmt.Sprintf("defaultPullConsumer pull message from broker: %s, request: %+v", brokerResult.BrokerAddr, pullRequest), nil) + result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest) if err != nil { rlog.Warning("defaultPullConsumer pull message from broker error", map[string]interface{}{