This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 7ffb599 [ISSUE #1112] feat: optimize producer send async (#1111) 7ffb599 is described below commit 7ffb599169034023d9f14e38c7599dc62be1f140 Author: WeizhongTu <weizhong....@alibaba-inc.com> AuthorDate: Mon Dec 4 11:12:31 2023 +0800 [ISSUE #1112] feat: optimize producer send async (#1111) * feat: optimize producer send async * fix: fix mq override bug --- internal/remote/remote_client.go | 25 +++++++++++++------------ producer/producer.go | 2 +- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go index 36fbea7..c0ef6ce 100644 --- a/internal/remote/remote_client.go +++ b/internal/remote/remote_client.go @@ -112,23 +112,24 @@ func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *R // InvokeAsync send request without blocking, just return immediately. func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, callback func(*ResponseFuture)) error { - conn, err := c.connect(ctx, addr) - if err != nil { - return err - } - resp := NewResponseFuture(ctx, request.Opaque, callback) c.responseTable.Store(resp.Opaque, resp) - err = c.sendRequest(ctx, conn, request) - if err != nil { - c.responseTable.Delete(request.Opaque) - return err - } - go primitive.WithRecover(func() { + defer resp.executeInvokeCallback() + defer c.responseTable.Delete(request.Opaque) + + conn, err := c.connect(ctx, addr) + if err != nil { + resp.Err = err + return + } + err = c.sendRequest(ctx, conn, request) + if err != nil { + resp.Err = err + return + } c.receiveAsync(resp) - c.responseTable.Delete(request.Opaque) }) return nil diff --git a/producer/producer.go b/producer/producer.go index 70e8d01..eb3cd2e 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -329,7 +329,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, if mq != nil { lastBrokerName = mq.BrokerName } - mq := p.selectMessageQueue(msg, lastBrokerName) + mq = p.selectMessageQueue(msg, lastBrokerName) if mq == nil { err = fmt.Errorf("the topic=%s route info not found", msg.Topic) continue