This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ee932fa35b [INLONG-8637][SDK] Pool data request and batch request (#8638) ee932fa35b is described below commit ee932fa35b675bbc8168777416cc31a1721eefa4 Author: gunli <24350...@qq.com> AuthorDate: Sun Aug 6 15:56:24 2023 +0800 [INLONG-8637][SDK] Pool data request and batch request (#8638) Co-authored-by: gunli <gu...@tencent.com> --- .../dataproxy-sdk-golang/dataproxy/request.go | 26 ++++++++++++++++++++++ .../dataproxy-sdk-golang/dataproxy/worker.go | 8 +++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go index ac89aba142..d93b0af006 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go @@ -22,6 +22,7 @@ import ( "encoding/binary" "strconv" "strings" + "sync" "time" "unsafe" @@ -35,6 +36,8 @@ var ( byteOrder = binary.BigEndian heartbeatRsp = []byte{0x00, 0x00, 0x00, 0x01, 0x01} heartbeatRspLen = len(heartbeatRsp) + reqPool *sync.Pool + batchPool *sync.Pool ) const ( @@ -42,6 +45,19 @@ const ( msgTypeHeartbeat uint8 = 1 ) +func init() { + reqPool = &sync.Pool{ + New: func() interface{} { + return &sendDataReq{} + }, + } + batchPool = &sync.Pool{ + New: func() interface{} { + return &batchReq{} + }, + } +} + type heartbeatReq struct { } @@ -68,6 +84,7 @@ func (h heartbeatReq) encode(buffer *bytes.Buffer) []byte { type batchCallback func() type batchReq struct { + pool *sync.Pool workerID string batchID string groupID string @@ -112,6 +129,10 @@ func (b *batchReq) done(err error) { b.metrics.observeTime(errorCode, time.Since(b.batchTime).Milliseconds()) b.metrics.observeSize(errorCode, b.dataSize) } + + if b.pool != nil { + b.pool.Put(b) + } } func (b *batchReq) encode() []byte { @@ -299,6 +320,7 @@ func (b *batchRsp) decode(input []byte) { } type sendDataReq struct { + pool *sync.Pool ctx context.Context msg Message callback Callback @@ -328,6 +350,10 @@ func (s *sendDataReq) done(err error, errCode string) { s.metrics.incMessage(errCode) } + + if s.pool != nil { + s.pool.Put(s) + } } type closeReq struct { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index 7edfc0c6dc..eb4a8348ac 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -244,7 +244,9 @@ func (w *worker) start() { } func (w *worker) doSendAsync(ctx context.Context, msg Message, callback Callback, flushImmediately bool) { - req := &sendDataReq{ + req := reqPool.Get().(*sendDataReq) + *req = sendDataReq{ + pool: reqPool, ctx: ctx, msg: msg, callback: callback, @@ -319,7 +321,9 @@ func (w *worker) handleSendData(req *sendDataReq) { batch, ok := w.pendingBatches[req.msg.StreamID] if !ok { streamID := req.msg.StreamID - batch = &batchReq{ + batch = batchPool.Get().(*batchReq) + *batch = batchReq{ + pool: batchPool, workerID: w.indexStr, batchID: util.SnowFlakeID(), groupID: w.options.GroupID,