This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ffb599  [ISSUE #1112] feat: optimize producer send async (#1111)
7ffb599 is described below

commit 7ffb599169034023d9f14e38c7599dc62be1f140
Author: WeizhongTu <weizhong....@alibaba-inc.com>
AuthorDate: Mon Dec 4 11:12:31 2023 +0800

    [ISSUE #1112] feat: optimize producer send async (#1111)
    
    * feat: optimize producer send async
    
    * fix: fix mq override bug
---
 internal/remote/remote_client.go | 25 +++++++++++++------------
 producer/producer.go             |  2 +-
 2 files changed, 14 insertions(+), 13 deletions(-)

diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 36fbea7..c0ef6ce 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -112,23 +112,24 @@ func (c *remotingClient) InvokeSync(ctx context.Context, 
addr string, request *R
 
 // InvokeAsync send request without blocking, just return immediately.
 func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request 
*RemotingCommand, callback func(*ResponseFuture)) error {
-       conn, err := c.connect(ctx, addr)
-       if err != nil {
-               return err
-       }
-
        resp := NewResponseFuture(ctx, request.Opaque, callback)
        c.responseTable.Store(resp.Opaque, resp)
 
-       err = c.sendRequest(ctx, conn, request)
-       if err != nil {
-               c.responseTable.Delete(request.Opaque)
-               return err
-       }
-
        go primitive.WithRecover(func() {
+               defer resp.executeInvokeCallback()
+               defer c.responseTable.Delete(request.Opaque)
+
+               conn, err := c.connect(ctx, addr)
+               if err != nil {
+                       resp.Err = err
+                       return
+               }
+               err = c.sendRequest(ctx, conn, request)
+               if err != nil {
+                       resp.Err = err
+                       return
+               }
                c.receiveAsync(resp)
-               c.responseTable.Delete(request.Opaque)
        })
 
        return nil
diff --git a/producer/producer.go b/producer/producer.go
index 70e8d01..eb3cd2e 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -329,7 +329,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg 
*primitive.Message,
                if mq != nil {
                        lastBrokerName = mq.BrokerName
                }
-               mq := p.selectMessageQueue(msg, lastBrokerName)
+               mq = p.selectMessageQueue(msg, lastBrokerName)
                if mq == nil {
                        err = fmt.Errorf("the topic=%s route info not found", 
msg.Topic)
                        continue

Reply via email to