This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0d07152244 [INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK (#9181) 0d07152244 is described below commit 0d07152244153db1299d75a1a64ebb0a8b891d05 Author: gunli <24350...@qq.com> AuthorDate: Wed Nov 1 09:47:14 2023 +0800 [INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK (#9181) Co-authored-by: gunli <gu...@tencent.com> --- .../dataproxy-sdk-golang/dataproxy/request.go | 23 +++++++++++++++++----- .../dataproxy-sdk-golang/dataproxy/worker.go | 6 +++++- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go index d93b0af006..187e074415 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go @@ -53,7 +53,9 @@ func init() { } batchPool = &sync.Pool{ New: func() interface{} { - return &batchReq{} + return &batchReq{ + dataReqs: make([]*sendDataReq, 0, 50), + } }, } } @@ -109,12 +111,17 @@ func (b *batchReq) append(req *sendDataReq) { func (b *batchReq) done(err error) { errorCode := getErrorCode(err) - for _, req := range b.dataReqs { + for i, req := range b.dataReqs { req.done(err, errorCode) + b.dataReqs[i] = nil + } + if b.dataReqs != nil { + b.dataReqs = b.dataReqs[:0] } if b.callback != nil { b.callback() + b.callback = nil } if b.buffer != nil && b.bufferPool != nil { @@ -128,10 +135,12 @@ func (b *batchReq) done(err error) { } b.metrics.observeTime(errorCode, time.Since(b.batchTime).Milliseconds()) b.metrics.observeSize(errorCode, b.dataSize) + b.metrics = nil } if b.pool != nil { b.pool.Put(b) + b.pool = nil } } @@ -334,25 +343,29 @@ type sendDataReq struct { func (s *sendDataReq) done(err error, errCode string) { if s.semaphore != nil { s.semaphore.Release() + if s.metrics != nil { + s.metrics.decPending(s.workerID) + } + s.semaphore = nil } if s.callback != nil { s.callback(s.msg, err) + s.callback = nil } if s.metrics != nil { - if s.semaphore != nil { - s.metrics.decPending(s.workerID) - } if errCode == "" { errCode = getErrorCode(err) } s.metrics.incMessage(errCode) + s.metrics = nil } if s.pool != nil { s.pool.Put(s) + s.pool = nil } } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index be3772c38d..d5ab3018cf 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -322,13 +322,17 @@ func (w *worker) handleSendData(req *sendDataReq) { if !ok { streamID := req.msg.StreamID batch = batchPool.Get().(*batchReq) + dataReqs := batch.dataReqs + if dataReqs == nil { + dataReqs = make([]*sendDataReq, 0, w.options.BatchingMaxMessages) + } *batch = batchReq{ pool: batchPool, workerID: w.indexStr, batchID: util.SnowFlakeID(), groupID: w.options.GroupID, streamID: streamID, - dataReqs: make([]*sendDataReq, 0, w.options.BatchingMaxMessages), + dataReqs: dataReqs, batchTime: time.Now(), retries: 0, bufferPool: w.bufferPool,