This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ee932fa35b [INLONG-8637][SDK] Pool data request and batch request 
(#8638)
ee932fa35b is described below

commit ee932fa35b675bbc8168777416cc31a1721eefa4
Author: gunli <24350...@qq.com>
AuthorDate: Sun Aug 6 15:56:24 2023 +0800

    [INLONG-8637][SDK] Pool data request and batch request (#8638)
    
    Co-authored-by: gunli <gu...@tencent.com>
---
 .../dataproxy-sdk-golang/dataproxy/request.go      | 26 ++++++++++++++++++++++
 .../dataproxy-sdk-golang/dataproxy/worker.go       |  8 +++++--
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
index ac89aba142..d93b0af006 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
@@ -22,6 +22,7 @@ import (
        "encoding/binary"
        "strconv"
        "strings"
+       "sync"
        "time"
        "unsafe"
 
@@ -35,6 +36,8 @@ var (
        byteOrder       = binary.BigEndian
        heartbeatRsp    = []byte{0x00, 0x00, 0x00, 0x01, 0x01}
        heartbeatRspLen = len(heartbeatRsp)
+       reqPool         *sync.Pool
+       batchPool       *sync.Pool
 )
 
 const (
@@ -42,6 +45,19 @@ const (
        msgTypeHeartbeat uint8 = 1
 )
 
+func init() {
+       reqPool = &sync.Pool{
+               New: func() interface{} {
+                       return &sendDataReq{}
+               },
+       }
+       batchPool = &sync.Pool{
+               New: func() interface{} {
+                       return &batchReq{}
+               },
+       }
+}
+
 type heartbeatReq struct {
 }
 
@@ -68,6 +84,7 @@ func (h heartbeatReq) encode(buffer *bytes.Buffer) []byte {
 
 type batchCallback func()
 type batchReq struct {
+       pool         *sync.Pool
        workerID     string
        batchID      string
        groupID      string
@@ -112,6 +129,10 @@ func (b *batchReq) done(err error) {
                b.metrics.observeTime(errorCode, 
time.Since(b.batchTime).Milliseconds())
                b.metrics.observeSize(errorCode, b.dataSize)
        }
+
+       if b.pool != nil {
+               b.pool.Put(b)
+       }
 }
 
 func (b *batchReq) encode() []byte {
@@ -299,6 +320,7 @@ func (b *batchRsp) decode(input []byte) {
 }
 
 type sendDataReq struct {
+       pool             *sync.Pool
        ctx              context.Context
        msg              Message
        callback         Callback
@@ -328,6 +350,10 @@ func (s *sendDataReq) done(err error, errCode string) {
 
                s.metrics.incMessage(errCode)
        }
+
+       if s.pool != nil {
+               s.pool.Put(s)
+       }
 }
 
 type closeReq struct {
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index 7edfc0c6dc..eb4a8348ac 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -244,7 +244,9 @@ func (w *worker) start() {
 }
 
 func (w *worker) doSendAsync(ctx context.Context, msg Message, callback 
Callback, flushImmediately bool) {
-       req := &sendDataReq{
+       req := reqPool.Get().(*sendDataReq)
+       *req = sendDataReq{
+               pool:             reqPool,
                ctx:              ctx,
                msg:              msg,
                callback:         callback,
@@ -319,7 +321,9 @@ func (w *worker) handleSendData(req *sendDataReq) {
        batch, ok := w.pendingBatches[req.msg.StreamID]
        if !ok {
                streamID := req.msg.StreamID
-               batch = &batchReq{
+               batch = batchPool.Get().(*batchReq)
+               *batch = batchReq{
+                       pool:       batchPool,
                        workerID:   w.indexStr,
                        batchID:    util.SnowFlakeID(),
                        groupID:    w.options.GroupID,

Reply via email to