This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 9995c22774 [INLONG-11668][SDK] Add max life time support for the connections in conn pool of Golang SDK (#11669) 9995c22774 is described below commit 9995c22774559b32f828935060d5c7e12085b2bb Author: gunli <24350...@qq.com> AuthorDate: Wed Jan 15 11:05:37 2025 +0800 [INLONG-11668][SDK] Add max life time support for the connections in conn pool of Golang SDK (#11669) --- .../dataproxy-sdk-golang/README.md | 2 + .../dataproxy-sdk-golang/connpool/connpool.go | 191 ++++++++++++++++++--- .../dataproxy-sdk-golang/dataproxy/client.go | 10 +- .../dataproxy-sdk-golang/dataproxy/options.go | 4 +- .../dataproxy/options_basic.go | 7 + 5 files changed, 180 insertions(+), 34 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md index 3273b2a464..60c8336970 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md @@ -161,6 +161,8 @@ type Options struct { BlockIfQueueIsFull bool // whether Send and SendAsync block if producer's message queue is full, default: false AddColumns map[string]string // addition columns to add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all the message will be added 2 more columns with worldid=xxx and ip=yyy addColumnStr string // the string format of the AddColumns, just a cache, used internal + Auth Auth // dataproxy authentication interface + MaxConnLifetime time.Duration // connection max lifetime, default: 0, set to 5m/10m when the servers provide service though CLBs (Cloud Load Balancers) } ``` diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go index d3bee80d0c..1054041cb4 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go @@ -45,23 +45,42 @@ var ( // Dialer is the interface of a dialer that return a NetConn type Dialer interface { - Dial(addr string) (gnet.Conn, error) + // Dial dials to the addr and bind ctx to the returned connection, which network(TCP/UDP) to use is determined by the Dialer + // Dial should use gnet.Client.DialContext() to get a connection that can be driven by a gnet event engine. + Dial(addr string, ctx any) (gnet.Conn, error) +} + +// ConnContext is the additional attributes to set to a gnet.Conn +type ConnContext struct { + CreatedAt time.Time // the created time of the connection + Endpoint string // the address of the remote endpoint } // EndpointRestrictedConnPool is the interface of a simple endpoint restricted connection pool that // the connection's remote address must be in an endpoint list, if not, it will be closed and can // not be used anymore, it is useful for holding the connections to a service whose endpoints can // be changed at runtime. +// Best practice: +// gnet is a high-performance networking package, the best way to use this pool is: +// 1. call Get() to get a gnet.Conn; +// 2. use the conn to read/write for a duration, 1m, for example, and then put the conn back to the pool and get a new one for load balancing, avoid putting/getting frequently; +// 3. do not switch(put and get) to a new conn in the callback of gnet.Conn.AsyncWrite(buf []byte, callback AsyncCallback) or gnet.Conn.AsyncWritev(bs [][]byte, callback AsyncCallback), it may be blocked; +// 4. if you use TCP conn and can not update endpoints by service discovery directly, for example, your endpoints are behind at the back of a LB, it is better to set a max lifetime +// for your pool, so that you can restart your endpoints(RS) without data lost by: +// 1). set the weight of your endpoint(RS) to 0, so that no new connection incoming; +// 2). wait for the existing connections to close by lifetime timeout; +// 3). restart your endpoint. type EndpointRestrictedConnPool interface { - // Get gets a connection + // Get gets a connection, it's concurrency-safe, but you can not call it in the callback of gnet.Conn.AsyncWrite() or gnet.Conn.AsyncWritev(). Get() (gnet.Conn, error) - // Put puts a connection back to the pool, if err is not nil, the connection will be closed by the pool + // Put puts a connection back to the pool, if err is not nil, the connection will be closed by the pool, it's concurrency-safe, + // but you can not call it in the callback of gnet.Conn.AsyncWrite() or gnet.Conn.AsyncWritev(). Put(conn gnet.Conn, err error) - // UpdateEndpoints updates the endpoints the pool to dial to + // UpdateEndpoints updates the endpoints the pool to dial to, it's not concurrency-safe. UpdateEndpoints(all, add, del []string) - // NumPooled returns the connection number in the pool, not the number of all the connection that the pool created + // NumPooled returns the connection number in the pool, not the number of all the connection that the pool created, it's concurrency-safe. NumPooled() int - // OnConnClosed used to notify that a connection is closed, the connection will be removed from the pool, if err is not nil, the remote endpoint will mark as unavailable + // OnConnClosed used to notify that a connection is closed, the connection will be removed from the pool, if err is not nil, the remote endpoint will mark as unavailable, it's concurrency-safe. OnConnClosed(conn gnet.Conn, err error) // Close closes the pool Close() @@ -69,7 +88,7 @@ type EndpointRestrictedConnPool interface { // NewConnPool news a EndpointRestrictedConnPool func NewConnPool(initEndpoints []string, connsPerEndpoint, size int, - dialer Dialer, log logger.Logger) (EndpointRestrictedConnPool, error) { + dialer Dialer, log logger.Logger, maxConnLifetime time.Duration) (EndpointRestrictedConnPool, error) { if len(initEndpoints) == 0 { return nil, ErrInitEndpointEmpty } @@ -107,7 +126,8 @@ func NewConnPool(initEndpoints []string, connsPerEndpoint, size int, Multiplier: 2, Randomization: 0.5, }, - closeCh: make(chan struct{}), + closeCh: make(chan struct{}), + maxConnLifetime: maxConnLifetime, } // store endpoints @@ -143,7 +163,25 @@ type connPool struct { backoff util.ExponentialBackoff closeCh chan struct{} closeOnce sync.Once - endpointConnCounts sync.Map // store the conn count of each endpoint + endpointConnCounts sync.Map // store the conn count of each endpoint + maxConnLifetime time.Duration // the max lifetime of a connection +} + +func (p *connPool) expired(conn gnet.Conn) bool { + if conn == nil || p.maxConnLifetime <= 0 { + return false + } + + ctx := conn.Context() + if ctx == nil { + return false + } + + connCtx, ok := ctx.(ConnContext) + if !ok { + return false + } + return connCtx.CreatedAt.Add(p.maxConnLifetime).Before(time.Now()) } func (p *connPool) Get() (gnet.Conn, error) { @@ -204,7 +242,7 @@ func (p *connPool) newConn() (gnet.Conn, error) { func (p *connPool) dialNewConn(ep string) (gnet.Conn, error) { p.log.Debug("dialNewConn()") - conn, err := p.dialer.Dial(ep) + conn, err := p.dialer.Dial(ep, ConnContext{CreatedAt: time.Now(), Endpoint: ep}) if err != nil { p.markUnavailable(ep) return nil, err @@ -278,6 +316,15 @@ func (p *connPool) put(conn gnet.Conn, err error, isNewConn bool) { return } + // if conn is expired, close it + if p.expired(conn) { + p.log.Debug("connection expired, close it, addr:", addr, ", err:", err) + CloseConn(conn, defaultConnCloseDelay) + // 关闭连接后,可用连接数变少,addr对应的节点的连接数可能也不均衡,尽管会递归调用当前函数,仍在这里追加创建新的连接 + _ = p.appendNewConn(addr) + return + } + select { case p.connChan <- conn: // update the conn count @@ -488,6 +535,17 @@ func (p *connPool) recoverAndRebalance() { reBalanceTicker := time.NewTicker(defaultConnCloseDelay + 30*time.Second) defer reBalanceTicker.Stop() + // clean expired conn every minute + var cleanExpiredConnTicker *time.Ticker + if p.maxConnLifetime > 0 { + cleanExpiredConnTicker = time.NewTicker(1 * time.Minute) + } + defer func() { + if cleanExpiredConnTicker != nil { + cleanExpiredConnTicker.Stop() + } + }() + for { select { case <-recoverTicker.C: @@ -502,10 +560,79 @@ func (p *connPool) recoverAndRebalance() { p.rebalance() case <-p.closeCh: return + default: + if cleanExpiredConnTicker != nil { + select { + case <-cleanExpiredConnTicker.C: + p.cleanExpiredConns() + default: + time.Sleep(time.Second) + } + } } } } +func getRemoteAddr(conn gnet.Conn) string { + if conn == nil { + return "" + } + + addr := conn.RemoteAddr() + if addr != nil { + return addr.String() + } + ctx := conn.Context() + if ctx == nil { + return "" + } + + connCtx, ok := ctx.(ConnContext) + if !ok { + return "" + } + return connCtx.Endpoint +} + +func (p *connPool) cleanExpiredConns() { + p.log.Debug("cleanExpiredConns()") + var leftConns []gnet.Conn + var expiredConns []gnet.Conn +loop: + for i := 0; i < cap(p.connChan); i++ { + select { + case conn := <-p.connChan: + if p.expired(conn) { + expiredConns = append(expiredConns, conn) + continue + } + + // not the expired conn, put it back + leftConns = append(leftConns, conn) + default: + // no more conn, exit the loop + break loop + } + } + + // put the conn back to the chan + for _, left := range leftConns { + select { + case p.connChan <- left: + default: + CloseConn(left, defaultConnCloseDelay) + } + } + + // close the expired conn and append new conn with the same addr + for _, expired := range expiredConns { + addr := getRemoteAddr(expired) + p.log.Debug("connection expired, close it, addr:", addr, ", err:", nil) + CloseConn(expired, defaultConnCloseDelay) + _ = p.appendNewConn(addr) + } +} + func (p *connPool) dump() { p.log.Debug("all endpoints:") eps := p.endpoints.Load() @@ -542,7 +669,7 @@ func (p *connPool) recover() bool { } if time.Since(lastUnavailable) > p.backoff.Next(retries) { // try to create new conn - conn, err := p.dialer.Dial(key.(string)) + conn, err := p.dialer.Dial(key.(string), ConnContext{CreatedAt: time.Now(), Endpoint: key.(string)}) if err == nil { p.log.Debug("endpoint recovered, addr: ", key) p.put(conn, nil, true) @@ -675,15 +802,11 @@ func (p *connPool) rebalance() { // add new conn for i := currentCount; i < expectedConnPerEndpoint; i++ { - conn, err := p.dialNewConn(addr) - if err == nil { - p.log.Debug("adding connection for addr: ", addr) - p.put(conn, nil, true) - rebalanced = true - } else { - p.log.Warn("failed to add connection during rebalancing, addr: ", addr, ", err: ", err) - break + err := p.appendNewConn(addr) + if err != nil { + continue } + rebalanced = true } } else if currentCount > expectedConnPerEndpoint { rebalanced = true @@ -699,15 +822,11 @@ func (p *connPool) rebalance() { return true } for i := 0; i < expectedConnPerEndpoint; i++ { - conn, err := p.dialNewConn(addr) - if err == nil { - p.log.Debug("adding connection for addr: ", addr) - p.put(conn, nil, true) - rebalanced = true - } else { - p.log.Warn("failed to add connection during rebalancing, addr: ", addr, ", err: ", err) - break + err := p.appendNewConn(addr) + if err != nil { + continue } + rebalanced = true } return true }) @@ -717,6 +836,22 @@ func (p *connPool) rebalance() { } } +func (p *connPool) appendNewConn(addr string) error { + if addr == "" { + return errors.New("addr is empty") + } + + conn, err := p.dialNewConn(addr) + if err != nil { + p.log.Warn("failed to add connection, addr: ", addr, ", err: ", err) + return err + } + + p.log.Debug("adding connection for addr: ", addr) + p.put(conn, nil, true) + return nil +} + func (p *connPool) removeEndpointConn(addr string, count int) { var leftConns []gnet.Conn var removed int @@ -730,7 +865,7 @@ loop: } if remoteAddr.String() == addr { - p.log.Info("reducing connection for addr: ", addr) + p.log.Debug("reducing connection for addr: ", addr) // we do not decrease conn count here, if the frequence of rebalancing is less then defaultConnCloseDelay, may lead to an inaccurate expected conn count per endpoint CloseConn(conn, defaultConnCloseDelay) removed++ diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go index 5ea6781276..3e1238ce94 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go @@ -166,7 +166,7 @@ func (c *client) initConns() error { // minimum connection number per endpoint is 1 connsPerEndpoint := int(math.Ceil(float64(c.options.WorkerNum) * 1.2 / float64(epLen))) - pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c, c.log) + pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c, c.log, c.options.MaxConnLifetime) if err != nil { return err } @@ -176,7 +176,7 @@ func (c *client) initConns() error { } func (c *client) initFramer() error { - framer, err := framer.NewLengthField(framer.LengthFieldCfg{ + fr, err := framer.NewLengthField(framer.LengthFieldCfg{ MaxFrameLen: 64 * 1024, FieldOffset: 0, FieldLength: 4, @@ -186,7 +186,7 @@ func (c *client) initFramer() error { if err != nil { return err } - c.framer = framer + c.framer = fr return nil } @@ -211,8 +211,8 @@ func (c *client) initWorkers() error { return nil } -func (c *client) Dial(addr string) (gnet.Conn, error) { - return c.netClient.Dial("tcp", addr) +func (c *client) Dial(addr string, ctx any) (gnet.Conn, error) { + return c.netClient.DialContext("tcp", addr, ctx) } func (c *client) Send(ctx context.Context, msg Message) error { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go index 490e90ce79..5d4692f077 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go @@ -23,9 +23,10 @@ import ( "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger" - "github.com/prometheus/client_golang/prometheus" ) const ( @@ -78,6 +79,7 @@ type Options struct { AddColumns map[string]string // addition columns to add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all the message will be added 2 more columns with worldid=xxx and ip=yyy addColumnStr string // the string format of the AddColumns, just a cache, used internal Auth Auth // dataproxy authentication interface + MaxConnLifetime time.Duration // connection max lifetime, default: 0, set to 5m/10m when the servers provide service though CLBs (Cloud Load Balancers) } // ValidateAndSetDefault validates an options and set up the default values diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go index 5265a1a628..de3c228769 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go @@ -182,3 +182,10 @@ func WithAuth(auth Auth) Option { o.Auth = auth } } + +// WithMaxConnLifetime sets MaxConnLifetime +func WithMaxConnLifetime(lifetime time.Duration) Option { + return func(o *Options) { + o.MaxConnLifetime = lifetime + } +}