This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9995c22774 [INLONG-11668][SDK] Add max life time support for the 
connections in conn pool of Golang SDK (#11669)
9995c22774 is described below

commit 9995c22774559b32f828935060d5c7e12085b2bb
Author: gunli <24350...@qq.com>
AuthorDate: Wed Jan 15 11:05:37 2025 +0800

    [INLONG-11668][SDK] Add max life time support for the connections in conn 
pool of Golang SDK (#11669)
---
 .../dataproxy-sdk-golang/README.md                 |   2 +
 .../dataproxy-sdk-golang/connpool/connpool.go      | 191 ++++++++++++++++++---
 .../dataproxy-sdk-golang/dataproxy/client.go       |  10 +-
 .../dataproxy-sdk-golang/dataproxy/options.go      |   4 +-
 .../dataproxy/options_basic.go                     |   7 +
 5 files changed, 180 insertions(+), 34 deletions(-)

diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md
index 3273b2a464..60c8336970 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md
@@ -161,6 +161,8 @@ type Options struct {
        BlockIfQueueIsFull      bool                  // whether Send and 
SendAsync block if producer's message queue is full, default: false
        AddColumns              map[string]string     // addition columns to 
add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all 
the message will be added 2 more columns with worldid=xxx and ip=yyy
        addColumnStr            string                // the string format of 
the AddColumns, just a cache, used internal
+       Auth                    Auth                  // dataproxy 
authentication interface
+       MaxConnLifetime         time.Duration         // connection max 
lifetime, default: 0, set to 5m/10m when the servers provide service though 
CLBs (Cloud Load Balancers)
 }
 ```
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
index d3bee80d0c..1054041cb4 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
@@ -45,23 +45,42 @@ var (
 
 // Dialer is the interface of a dialer that return a NetConn
 type Dialer interface {
-       Dial(addr string) (gnet.Conn, error)
+       // Dial dials to the addr and bind ctx to the returned connection, 
which network(TCP/UDP) to use is determined by the Dialer
+       // Dial should use gnet.Client.DialContext() to get a connection that 
can be driven by a gnet event engine.
+       Dial(addr string, ctx any) (gnet.Conn, error)
+}
+
+// ConnContext is the additional attributes to set to a gnet.Conn
+type ConnContext struct {
+       CreatedAt time.Time // the created time of the connection
+       Endpoint  string    // the address of the remote endpoint
 }
 
 // EndpointRestrictedConnPool is the interface of a simple endpoint restricted 
connection pool that
 // the connection's remote address must be in an endpoint list, if not, it 
will be closed and can
 // not be used anymore, it is useful for holding the connections to a service 
whose endpoints can
 // be changed at runtime.
+// Best practice:
+// gnet is a high-performance networking package, the best way to use this 
pool is:
+//  1. call Get() to get a gnet.Conn;
+//  2. use the conn to read/write for a duration, 1m, for example, and then 
put the conn back to the pool and get a new one for load balancing, avoid 
putting/getting frequently;
+//  3. do not switch(put and get) to a new conn in the callback of 
gnet.Conn.AsyncWrite(buf []byte, callback AsyncCallback) or 
gnet.Conn.AsyncWritev(bs [][]byte, callback AsyncCallback), it may be blocked;
+//  4. if you use TCP conn and can not update endpoints by service discovery 
directly, for example, your endpoints are behind at the back of a LB, it is 
better to set a max lifetime
+//     for your pool, so that you can restart your endpoints(RS) without data 
lost by:
+//     1). set the weight of your endpoint(RS) to 0, so that no new connection 
incoming;
+//     2). wait for the existing connections to close by lifetime timeout;
+//     3). restart your endpoint.
 type EndpointRestrictedConnPool interface {
-       // Get gets a connection
+       // Get gets a connection, it's concurrency-safe, but you can not call 
it in the callback of gnet.Conn.AsyncWrite() or gnet.Conn.AsyncWritev().
        Get() (gnet.Conn, error)
-       // Put puts a connection back to the pool, if err is not nil, the 
connection will be closed by the pool
+       // Put puts a connection back to the pool, if err is not nil, the 
connection will be closed by the pool, it's concurrency-safe,
+       // but you can not call it in the callback of gnet.Conn.AsyncWrite() or 
gnet.Conn.AsyncWritev().
        Put(conn gnet.Conn, err error)
-       // UpdateEndpoints updates the endpoints the pool to dial to
+       // UpdateEndpoints updates the endpoints the pool to dial to, it's not 
concurrency-safe.
        UpdateEndpoints(all, add, del []string)
-       // NumPooled returns the connection number in the pool, not the number 
of all the connection that the pool created
+       // NumPooled returns the connection number in the pool, not the number 
of all the connection that the pool created, it's concurrency-safe.
        NumPooled() int
-       // OnConnClosed used to notify that a connection is closed, the 
connection will be removed from the pool, if err is not nil, the remote 
endpoint will mark as unavailable
+       // OnConnClosed used to notify that a connection is closed, the 
connection will be removed from the pool, if err is not nil, the remote 
endpoint will mark as unavailable, it's concurrency-safe.
        OnConnClosed(conn gnet.Conn, err error)
        // Close closes the pool
        Close()
@@ -69,7 +88,7 @@ type EndpointRestrictedConnPool interface {
 
 // NewConnPool news a EndpointRestrictedConnPool
 func NewConnPool(initEndpoints []string, connsPerEndpoint, size int,
-       dialer Dialer, log logger.Logger) (EndpointRestrictedConnPool, error) {
+       dialer Dialer, log logger.Logger, maxConnLifetime time.Duration) 
(EndpointRestrictedConnPool, error) {
        if len(initEndpoints) == 0 {
                return nil, ErrInitEndpointEmpty
        }
@@ -107,7 +126,8 @@ func NewConnPool(initEndpoints []string, connsPerEndpoint, 
size int,
                        Multiplier:      2,
                        Randomization:   0.5,
                },
-               closeCh: make(chan struct{}),
+               closeCh:         make(chan struct{}),
+               maxConnLifetime: maxConnLifetime,
        }
 
        // store endpoints
@@ -143,7 +163,25 @@ type connPool struct {
        backoff            util.ExponentialBackoff
        closeCh            chan struct{}
        closeOnce          sync.Once
-       endpointConnCounts sync.Map // store the conn count of each endpoint
+       endpointConnCounts sync.Map      // store the conn count of each 
endpoint
+       maxConnLifetime    time.Duration // the max lifetime of a connection
+}
+
+func (p *connPool) expired(conn gnet.Conn) bool {
+       if conn == nil || p.maxConnLifetime <= 0 {
+               return false
+       }
+
+       ctx := conn.Context()
+       if ctx == nil {
+               return false
+       }
+
+       connCtx, ok := ctx.(ConnContext)
+       if !ok {
+               return false
+       }
+       return connCtx.CreatedAt.Add(p.maxConnLifetime).Before(time.Now())
 }
 
 func (p *connPool) Get() (gnet.Conn, error) {
@@ -204,7 +242,7 @@ func (p *connPool) newConn() (gnet.Conn, error) {
 
 func (p *connPool) dialNewConn(ep string) (gnet.Conn, error) {
        p.log.Debug("dialNewConn()")
-       conn, err := p.dialer.Dial(ep)
+       conn, err := p.dialer.Dial(ep, ConnContext{CreatedAt: time.Now(), 
Endpoint: ep})
        if err != nil {
                p.markUnavailable(ep)
                return nil, err
@@ -278,6 +316,15 @@ func (p *connPool) put(conn gnet.Conn, err error, 
isNewConn bool) {
                return
        }
 
+       // if conn is expired, close it
+       if p.expired(conn) {
+               p.log.Debug("connection expired, close it, addr:", addr, ", 
err:", err)
+               CloseConn(conn, defaultConnCloseDelay)
+               // 关闭连接后,可用连接数变少,addr对应的节点的连接数可能也不均衡,尽管会递归调用当前函数,仍在这里追加创建新的连接
+               _ = p.appendNewConn(addr)
+               return
+       }
+
        select {
        case p.connChan <- conn:
                // update the conn count
@@ -488,6 +535,17 @@ func (p *connPool) recoverAndRebalance() {
        reBalanceTicker := time.NewTicker(defaultConnCloseDelay + 
30*time.Second)
        defer reBalanceTicker.Stop()
 
+       // clean expired conn every minute
+       var cleanExpiredConnTicker *time.Ticker
+       if p.maxConnLifetime > 0 {
+               cleanExpiredConnTicker = time.NewTicker(1 * time.Minute)
+       }
+       defer func() {
+               if cleanExpiredConnTicker != nil {
+                       cleanExpiredConnTicker.Stop()
+               }
+       }()
+
        for {
                select {
                case <-recoverTicker.C:
@@ -502,10 +560,79 @@ func (p *connPool) recoverAndRebalance() {
                        p.rebalance()
                case <-p.closeCh:
                        return
+               default:
+                       if cleanExpiredConnTicker != nil {
+                               select {
+                               case <-cleanExpiredConnTicker.C:
+                                       p.cleanExpiredConns()
+                               default:
+                                       time.Sleep(time.Second)
+                               }
+                       }
                }
        }
 }
 
+func getRemoteAddr(conn gnet.Conn) string {
+       if conn == nil {
+               return ""
+       }
+
+       addr := conn.RemoteAddr()
+       if addr != nil {
+               return addr.String()
+       }
+       ctx := conn.Context()
+       if ctx == nil {
+               return ""
+       }
+
+       connCtx, ok := ctx.(ConnContext)
+       if !ok {
+               return ""
+       }
+       return connCtx.Endpoint
+}
+
+func (p *connPool) cleanExpiredConns() {
+       p.log.Debug("cleanExpiredConns()")
+       var leftConns []gnet.Conn
+       var expiredConns []gnet.Conn
+loop:
+       for i := 0; i < cap(p.connChan); i++ {
+               select {
+               case conn := <-p.connChan:
+                       if p.expired(conn) {
+                               expiredConns = append(expiredConns, conn)
+                               continue
+                       }
+
+                       // not the expired conn, put it back
+                       leftConns = append(leftConns, conn)
+               default:
+                       // no more conn, exit the loop
+                       break loop
+               }
+       }
+
+       // put the conn back to the chan
+       for _, left := range leftConns {
+               select {
+               case p.connChan <- left:
+               default:
+                       CloseConn(left, defaultConnCloseDelay)
+               }
+       }
+
+       // close the expired conn and append new conn with the same addr
+       for _, expired := range expiredConns {
+               addr := getRemoteAddr(expired)
+               p.log.Debug("connection expired, close it, addr:", addr, ", 
err:", nil)
+               CloseConn(expired, defaultConnCloseDelay)
+               _ = p.appendNewConn(addr)
+       }
+}
+
 func (p *connPool) dump() {
        p.log.Debug("all endpoints:")
        eps := p.endpoints.Load()
@@ -542,7 +669,7 @@ func (p *connPool) recover() bool {
                }
                if time.Since(lastUnavailable) > p.backoff.Next(retries) {
                        // try to create new conn
-                       conn, err := p.dialer.Dial(key.(string))
+                       conn, err := p.dialer.Dial(key.(string), 
ConnContext{CreatedAt: time.Now(), Endpoint: key.(string)})
                        if err == nil {
                                p.log.Debug("endpoint recovered, addr: ", key)
                                p.put(conn, nil, true)
@@ -675,15 +802,11 @@ func (p *connPool) rebalance() {
 
                        // add new conn
                        for i := currentCount; i < expectedConnPerEndpoint; i++ 
{
-                               conn, err := p.dialNewConn(addr)
-                               if err == nil {
-                                       p.log.Debug("adding connection for 
addr: ", addr)
-                                       p.put(conn, nil, true)
-                                       rebalanced = true
-                               } else {
-                                       p.log.Warn("failed to add connection 
during rebalancing, addr: ", addr, ", err: ", err)
-                                       break
+                               err := p.appendNewConn(addr)
+                               if err != nil {
+                                       continue
                                }
+                               rebalanced = true
                        }
                } else if currentCount > expectedConnPerEndpoint {
                        rebalanced = true
@@ -699,15 +822,11 @@ func (p *connPool) rebalance() {
                        return true
                }
                for i := 0; i < expectedConnPerEndpoint; i++ {
-                       conn, err := p.dialNewConn(addr)
-                       if err == nil {
-                               p.log.Debug("adding connection for addr: ", 
addr)
-                               p.put(conn, nil, true)
-                               rebalanced = true
-                       } else {
-                               p.log.Warn("failed to add connection during 
rebalancing, addr: ", addr, ", err: ", err)
-                               break
+                       err := p.appendNewConn(addr)
+                       if err != nil {
+                               continue
                        }
+                       rebalanced = true
                }
                return true
        })
@@ -717,6 +836,22 @@ func (p *connPool) rebalance() {
        }
 }
 
+func (p *connPool) appendNewConn(addr string) error {
+       if addr == "" {
+               return errors.New("addr is empty")
+       }
+
+       conn, err := p.dialNewConn(addr)
+       if err != nil {
+               p.log.Warn("failed to add connection, addr: ", addr, ", err: ", 
err)
+               return err
+       }
+
+       p.log.Debug("adding connection for addr: ", addr)
+       p.put(conn, nil, true)
+       return nil
+}
+
 func (p *connPool) removeEndpointConn(addr string, count int) {
        var leftConns []gnet.Conn
        var removed int
@@ -730,7 +865,7 @@ loop:
                        }
 
                        if remoteAddr.String() == addr {
-                               p.log.Info("reducing connection for addr: ", 
addr)
+                               p.log.Debug("reducing connection for addr: ", 
addr)
                                // we do not decrease conn count here, if the 
frequence of rebalancing is less then defaultConnCloseDelay, may lead to an 
inaccurate expected conn count per endpoint
                                CloseConn(conn, defaultConnCloseDelay)
                                removed++
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
index 5ea6781276..3e1238ce94 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
@@ -166,7 +166,7 @@ func (c *client) initConns() error {
 
        // minimum connection number per endpoint is 1
        connsPerEndpoint := int(math.Ceil(float64(c.options.WorkerNum) * 1.2 / 
float64(epLen)))
-       pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c, 
c.log)
+       pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c, 
c.log, c.options.MaxConnLifetime)
        if err != nil {
                return err
        }
@@ -176,7 +176,7 @@ func (c *client) initConns() error {
 }
 
 func (c *client) initFramer() error {
-       framer, err := framer.NewLengthField(framer.LengthFieldCfg{
+       fr, err := framer.NewLengthField(framer.LengthFieldCfg{
                MaxFrameLen:  64 * 1024,
                FieldOffset:  0,
                FieldLength:  4,
@@ -186,7 +186,7 @@ func (c *client) initFramer() error {
        if err != nil {
                return err
        }
-       c.framer = framer
+       c.framer = fr
        return nil
 }
 
@@ -211,8 +211,8 @@ func (c *client) initWorkers() error {
        return nil
 }
 
-func (c *client) Dial(addr string) (gnet.Conn, error) {
-       return c.netClient.Dial("tcp", addr)
+func (c *client) Dial(addr string, ctx any) (gnet.Conn, error) {
+       return c.netClient.DialContext("tcp", addr, ctx)
 }
 
 func (c *client) Send(ctx context.Context, msg Message) error {
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
index 490e90ce79..5d4692f077 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
@@ -23,9 +23,10 @@ import (
 
        
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util"
 
+       "github.com/prometheus/client_golang/prometheus"
+
        
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool"
        
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger"
-       "github.com/prometheus/client_golang/prometheus"
 )
 
 const (
@@ -78,6 +79,7 @@ type Options struct {
        AddColumns              map[string]string     // addition columns to 
add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all 
the message will be added 2 more columns with worldid=xxx and ip=yyy
        addColumnStr            string                // the string format of 
the AddColumns, just a cache, used internal
        Auth                    Auth                  // dataproxy 
authentication interface
+       MaxConnLifetime         time.Duration         // connection max 
lifetime, default: 0, set to 5m/10m when the servers provide service though 
CLBs (Cloud Load Balancers)
 }
 
 // ValidateAndSetDefault validates an options and set up the default values
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go
 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go
index 5265a1a628..de3c228769 100755
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go
@@ -182,3 +182,10 @@ func WithAuth(auth Auth) Option {
                o.Auth = auth
        }
 }
+
+// WithMaxConnLifetime sets MaxConnLifetime
+func WithMaxConnLifetime(lifetime time.Duration) Option {
+       return func(o *Options) {
+               o.MaxConnLifetime = lifetime
+       }
+}

Reply via email to