This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d07152244 [INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK 
(#9181)
0d07152244 is described below

commit 0d07152244153db1299d75a1a64ebb0a8b891d05
Author: gunli <24350...@qq.com>
AuthorDate: Wed Nov 1 09:47:14 2023 +0800

    [INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK (#9181)
    
    Co-authored-by: gunli <gu...@tencent.com>
---
 .../dataproxy-sdk-golang/dataproxy/request.go      | 23 +++++++++++++++++-----
 .../dataproxy-sdk-golang/dataproxy/worker.go       |  6 +++++-
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
index d93b0af006..187e074415 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
@@ -53,7 +53,9 @@ func init() {
        }
        batchPool = &sync.Pool{
                New: func() interface{} {
-                       return &batchReq{}
+                       return &batchReq{
+                               dataReqs: make([]*sendDataReq, 0, 50),
+                       }
                },
        }
 }
@@ -109,12 +111,17 @@ func (b *batchReq) append(req *sendDataReq) {
 
 func (b *batchReq) done(err error) {
        errorCode := getErrorCode(err)
-       for _, req := range b.dataReqs {
+       for i, req := range b.dataReqs {
                req.done(err, errorCode)
+               b.dataReqs[i] = nil
+       }
+       if b.dataReqs != nil {
+               b.dataReqs = b.dataReqs[:0]
        }
 
        if b.callback != nil {
                b.callback()
+               b.callback = nil
        }
 
        if b.buffer != nil && b.bufferPool != nil {
@@ -128,10 +135,12 @@ func (b *batchReq) done(err error) {
                }
                b.metrics.observeTime(errorCode, 
time.Since(b.batchTime).Milliseconds())
                b.metrics.observeSize(errorCode, b.dataSize)
+               b.metrics = nil
        }
 
        if b.pool != nil {
                b.pool.Put(b)
+               b.pool = nil
        }
 }
 
@@ -334,25 +343,29 @@ type sendDataReq struct {
 func (s *sendDataReq) done(err error, errCode string) {
        if s.semaphore != nil {
                s.semaphore.Release()
+               if s.metrics != nil {
+                       s.metrics.decPending(s.workerID)
+               }
+               s.semaphore = nil
        }
 
        if s.callback != nil {
                s.callback(s.msg, err)
+               s.callback = nil
        }
 
        if s.metrics != nil {
-               if s.semaphore != nil {
-                       s.metrics.decPending(s.workerID)
-               }
                if errCode == "" {
                        errCode = getErrorCode(err)
                }
 
                s.metrics.incMessage(errCode)
+               s.metrics = nil
        }
 
        if s.pool != nil {
                s.pool.Put(s)
+               s.pool = nil
        }
 }
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index be3772c38d..d5ab3018cf 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -322,13 +322,17 @@ func (w *worker) handleSendData(req *sendDataReq) {
        if !ok {
                streamID := req.msg.StreamID
                batch = batchPool.Get().(*batchReq)
+               dataReqs := batch.dataReqs
+               if dataReqs == nil {
+                       dataReqs = make([]*sendDataReq, 0, 
w.options.BatchingMaxMessages)
+               }
                *batch = batchReq{
                        pool:       batchPool,
                        workerID:   w.indexStr,
                        batchID:    util.SnowFlakeID(),
                        groupID:    w.options.GroupID,
                        streamID:   streamID,
-                       dataReqs:   make([]*sendDataReq, 0, 
w.options.BatchingMaxMessages),
+                       dataReqs:   dataReqs,
                        batchTime:  time.Now(),
                        retries:    0,
                        bufferPool: w.bufferPool,

Reply via email to