This is an automated email from the ASF dual-hosted git repository. joaoreis pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-gocql-driver.git
The following commit(s) were added to refs/heads/trunk by this push: new eda74b66 Change logger so it supports structured logging and log levels eda74b66 is described below commit eda74b6688aea847446ea83ab4d9d646534a79ae Author: João Reis <joaor...@apache.org> AuthorDate: Tue Jun 17 16:02:01 2025 +0100 Change logger so it supports structured logging and log levels The logger interface uses printf semantics and lacks the notion of log levels. This commit changes (breaking change) the logger interface so printf semantics are removed in favor of structured logging and adds log levels. It provides 3 built in log implementations: - Default logger - uses log standard library - gocqlzap - uses zap library - gocqlzerolog - uses zerolog library Users can use these implementations as is or they can implement their own using these as examples. Patch by João Reis; reviewed by James Hartig, Bohdan Siryk for CASSGO-9 --- CHANGELOG.md | 11 +- address_translators_test.go | 3 + cluster.go | 28 ++-- cluster_test.go | 9 +- common_test.go | 5 +- conn.go | 24 +-- conn_test.go | 34 ++-- connectionpool.go | 36 ++--- control.go | 93 ++++++++--- control_test.go | 3 + crc_test.go | 3 + debug_off.go | 30 ---- debug_on.go | 30 ---- events.go | 41 ++--- events_test.go | 3 + filters.go | 2 +- filters_test.go | 3 + frame.go | 1 + frame_test.go | 3 + go.mod | 4 +- go.sum | 36 ++++- gocqlzap/zap.go | 88 ++++++++++ gocqlzap/zap_test.go | 80 +++++++++ gocqlzerolog/zerolog.go | 91 +++++++++++ gocqlzerolog/zerolog_test.go | 70 ++++++++ helpers.go | 9 ++ helpers_test.go | 3 + host_source.go | 9 +- hostpool/hostpool.go | 18 +++ hostpool/hostpool_test.go | 18 +++ logger.go | 378 +++++++++++++++++++++++++++++++++++++++++-- metadata_test.go | 9 +- policies.go | 6 +- policies_test.go | 3 + ring.go | 8 +- ring_test.go | 3 + session.go | 35 ++-- session_connect_test.go | 3 + session_test.go | 4 +- snappy/compressor_test.go | 3 + topology.go | 11 +- topology_test.go | 3 + 42 files changed, 1034 insertions(+), 220 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea0d600c..2a1c888b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Removed - Drop support for old CQL protocol versions: 1 and 2 (CASSGO-75) +- Cleanup of deprecated elements (CASSGO-12) +- Remove global NewBatch function (CASSGO-15) +- Remove deprecated global logger (CASSGO-24) ### Added @@ -27,19 +30,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Move lz4 compressor to lz4 package within the gocql module (CASSGO-32) - Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19) -- Cleanup of deprecated elements (CASSGO-12) -- Remove global NewBatch function (CASSGO-15) - Detailed description for NumConns (CASSGO-3) - Change Batch API to be consistent with Query() (CASSGO-7) -- Added Cassandra 4.0 table options support(CASSGO-13) -- Remove deprecated global logger (CASSGO-24) +- Added Cassandra 4.0 table options support (CASSGO-13) - Bumped actions/upload-artifact and actions/cache versions to v4 in CI workflow (CASSGO-48) - Keep nil slices in MapScan (CASSGO-44) - Improve error messages for marshalling (CASSGO-38) - Remove HostPoolHostPolicy from gocql package (CASSGO-21) - Standardized spelling of datacenter (CASSGO-35) - Refactor HostInfo creation and ConnectAddress() method (CASSGO-45) -- gocql.Compressor interface changes to follow append-like design. Bumped Go version to 1.19 (CASSGO-1) +- gocql.Compressor interface changes to follow append-like design (CASSGO-1) - Refactoring hostpool package test and Expose HostInfo creation (CASSGO-59) - Move "execute batch" methods to Batch type (CASSGO-57) - Make `Session` immutable by removing setters and associated mutex (CASSGO-23) @@ -47,6 +47,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - NativeType removed (CASSGO-43) - `New` and `NewWithError` removed and replaced with `Zero` (CASSGO-43) - Changes to Query and Batch to make them safely reusable (CASSGO-22) +- Change logger interface so it supports structured logging and log levels (CASSGO-9) ### Fixed diff --git a/address_translators_test.go b/address_translators_test.go index eb96a700..a1e4100a 100644 --- a/address_translators_test.go +++ b/address_translators_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/cluster.go b/cluster.go index f18f7297..812e459f 100644 --- a/cluster.go +++ b/cluster.go @@ -259,9 +259,15 @@ type ClusterConfig struct { // If not provided, Dialer will be used instead. HostDialer HostDialer - // Logger for this ClusterConfig. - // If not specified, defaults to the gocql.defaultLogger. - Logger StdLogger + // StructuredLogger for this ClusterConfig. + // + // There are 3 built in implementations of StructuredLogger: + // - std library "log" package: gocql.NewLogger + // - zerolog: gocqlzerolog.NewZerologLogger + // - zap: gocqlzap.NewZapLogger + // + // You can also provide your own logger implementation of the StructuredLogger interface. + Logger StructuredLogger // Tracer will be used for all queries. Alternatively it can be set of on a // per query basis. @@ -318,11 +324,11 @@ func NewCluster(hosts ...string) *ClusterConfig { return cfg } -func (cfg *ClusterConfig) logger() StdLogger { - if cfg.Logger == nil { - return &defaultLogger{} +func (cfg *ClusterConfig) newLogger() StructuredLogger { + if cfg.Logger != nil { + return cfg.Logger } - return cfg.Logger + return NewLogger(LogLevelNone) } // CreateSession initializes the cluster based on this config and returns a @@ -335,14 +341,14 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) { // if defined, to translate the given address and port into a possibly new address // and port, If no AddressTranslator or if an error occurs, the given address and // port will be returned. -func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int) (net.IP, int) { +func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int, logger StructuredLogger) (net.IP, int) { if cfg.AddressTranslator == nil || len(addr) == 0 { return addr, port } newAddr, newPort := cfg.AddressTranslator.Translate(addr, port) - if gocqlDebug { - cfg.logger().Printf("gocql: translating address '%v:%d' to '%v:%d'", addr, port, newAddr, newPort) - } + logger.Debug("Translating address.", + newLogFieldIp("old_addr", addr), newLogFieldInt("old_port", port), + newLogFieldIp("new_addr", newAddr), newLogFieldInt("new_port", newPort)) return newAddr, newPort } diff --git a/cluster_test.go b/cluster_test.go index adc21fd0..16d94ebe 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -60,7 +63,7 @@ func TestNewCluster_WithHosts(t *testing.T) { func TestClusterConfig_translateAddressAndPort_NilTranslator(t *testing.T) { cfg := NewCluster() assertNil(t, "cluster config address translator", cfg.AddressTranslator) - newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 1234) + newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 1234, nopLoggerSingleton) assertTrue(t, "same address as provided", net.ParseIP("10.0.0.1").Equal(newAddr)) assertEqual(t, "translated host and port", 1234, newPort) } @@ -68,7 +71,7 @@ func TestClusterConfig_translateAddressAndPort_NilTranslator(t *testing.T) { func TestClusterConfig_translateAddressAndPort_EmptyAddr(t *testing.T) { cfg := NewCluster() cfg.AddressTranslator = staticAddressTranslator(net.ParseIP("10.10.10.10"), 5432) - newAddr, newPort := cfg.translateAddressPort(net.IP([]byte{}), 0) + newAddr, newPort := cfg.translateAddressPort(net.IP([]byte{}), 0, nopLoggerSingleton) assertTrue(t, "translated address is still empty", len(newAddr) == 0) assertEqual(t, "translated port", 0, newPort) } @@ -76,7 +79,7 @@ func TestClusterConfig_translateAddressAndPort_EmptyAddr(t *testing.T) { func TestClusterConfig_translateAddressAndPort_Success(t *testing.T) { cfg := NewCluster() cfg.AddressTranslator = staticAddressTranslator(net.ParseIP("10.10.10.10"), 5432) - newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 2345) + newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 2345, nopLoggerSingleton) assertTrue(t, "translated address", net.ParseIP("10.10.10.10").Equal(newAddr)) assertEqual(t, "translated port", 5432, newPort) } diff --git a/common_test.go b/common_test.go index 1aa4dba2..27f5b959 100644 --- a/common_test.go +++ b/common_test.go @@ -1,3 +1,6 @@ +//go:build all || unit || integration || ccm || cassandra +// +build all unit integration ccm cassandra + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -85,7 +88,7 @@ func addSslOptions(cluster *ClusterConfig) *ClusterConfig { var initOnce sync.Once func createTable(s *Session, table string) error { - // lets just be really sure + // let's just be really sure if err := s.control.awaitSchemaAgreement(); err != nil { log.Printf("error waiting for schema agreement pre create table=%q err=%v\n", table, err) return err diff --git a/conn.go b/conn.go index 4e1bb4f2..41f7db41 100644 --- a/conn.go +++ b/conn.go @@ -144,19 +144,12 @@ type ConnConfig struct { Authenticator Authenticator AuthProvider func(h *HostInfo) (Authenticator, error) Keepalive time.Duration - Logger StdLogger + Logger StructuredLogger tlsConfig *tls.Config disableCoalesce bool } -func (c *ConnConfig) logger() StdLogger { - if c.Logger == nil { - return &defaultLogger{} - } - return c.Logger -} - type ConnErrorHandler interface { HandleError(conn *Conn, err error, closed bool) } @@ -208,7 +201,7 @@ type Conn struct { timeouts int64 - logger StdLogger + logger StructuredLogger } // connect establishes a connection to a Cassandra node using session's connection config. @@ -715,7 +708,7 @@ func (c *Conn) processFrame(ctx context.Context, r io.Reader) error { delete(c.calls, head.stream) c.mu.Unlock() if call == nil || !ok { - c.logger.Printf("gocql: received response for stream which has no handler: header=%v\n", head) + c.logger.Warning("Received response for stream which has no handler.", newLogFieldString("header", head.String())) return c.discardFrame(r, head) } else if head.stream != call.streamID { panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream)) @@ -1330,12 +1323,19 @@ func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer return resp.framer, nil case <-timeoutCh: close(call.timeout) + c.logger.Debug("Request timed out on connection.", + newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress())) c.handleTimeout() return nil, ErrTimeoutNoResponse case <-ctxDone: + c.logger.Debug("Request failed because context elapsed out on connection.", + newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()), + newLogFieldError("ctx_err", ctx.Err())) close(call.timeout) return nil, ctx.Err() case <-c.ctx.Done(): + c.logger.Debug("Request failed because connection closed.", + newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress())) close(call.timeout) return nil, ErrConnectionClosed } @@ -1685,7 +1685,7 @@ func (c *Conn) executeQuery(ctx context.Context, q *internalQuery) *Iter { iter.framer = framer if err := c.awaitSchemaAgreement(ctx); err != nil { // TODO: should have this behind a flag - c.logger.Println(err) + c.logger.Warning("Error while awaiting for schema agreement after a schema change event.", newLogFieldError("err", err)) } // dont return an error from this, might be a good idea to give a warning // though. The impact of this returning an error would be that the cluster @@ -1947,7 +1947,7 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) { goto cont } if !isValidPeer(host) || host.schemaVersion == "" { - c.logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host) + c.logger.Warning("Invalid peer or peer with empty schema_version.", newLogFieldIp("peer", host.ConnectAddress())) continue } diff --git a/conn_test.go b/conn_test.go index 9a358619..0a3bc69b 100644 --- a/conn_test.go +++ b/conn_test.go @@ -41,6 +41,7 @@ import ( "math/rand" "net" "os" + "strconv" "strings" "sync" "sync/atomic" @@ -184,7 +185,7 @@ func newTestSession(proto protoVersion, addresses ...string) (*Session, error) { } func TestDNSLookupConnected(t *testing.T) { - log := &testLogger{} + log := newTestLogger(LogLevelDebug) // Override the defaul DNS resolver and restore at the end failDNS = true @@ -205,13 +206,13 @@ func TestDNSLookupConnected(t *testing.T) { t.Fatal("CreateSession() should have connected") } - if !strings.Contains(log.String(), "gocql: dns error") { + if !strings.Contains(log.String(), "gocql: DNS error") { t.Fatalf("Expected to receive dns error log message - got '%s' instead", log.String()) } } func TestDNSLookupError(t *testing.T) { - log := &testLogger{} + log := newTestLogger(LogLevelDebug) // Override the defaul DNS resolver and restore at the end failDNS = true @@ -229,7 +230,7 @@ func TestDNSLookupError(t *testing.T) { t.Fatal("CreateSession() should have returned an error") } - if !strings.Contains(log.String(), "gocql: dns error") { + if !strings.Contains(log.String(), "gocql: DNS error") { t.Fatalf("Expected to receive dns error log message - got '%s' instead", log.String()) } @@ -240,7 +241,7 @@ func TestDNSLookupError(t *testing.T) { func TestStartupTimeout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - log := &testLogger{} + log := newTestLogger(LogLevelDebug) srv := NewTestServer(t, defaultProto, ctx) defer srv.Stop() @@ -348,17 +349,20 @@ func TestCancel(t *testing.T) { type testQueryObserver struct { metrics map[string]*hostMetrics - verbose bool - logger StdLogger + logger StructuredLogger } func (o *testQueryObserver) ObserveQuery(ctx context.Context, q ObservedQuery) { host := q.Host.ConnectAddress().String() o.metrics[host] = q.Metrics - if o.verbose { - o.logger.Printf("Observed query %q. Returned %v rows, took %v on host %q with %v attempts and total latency %v. Error: %q\n", - q.Statement, q.Rows, q.End.Sub(q.Start), host, q.Metrics.Attempts, q.Metrics.TotalLatency, q.Err) - } + o.logger.Debug("Observed query.", + newLogFieldString("stmt", q.Statement), + newLogFieldInt("rows", q.Rows), + newLogFieldString("duration", q.End.Sub(q.Start).String()), + newLogFieldString("host", host), + newLogFieldInt("attempts", q.Metrics.Attempts), + newLogFieldString("latency", strconv.FormatInt(q.Metrics.TotalLatency, 10)), + newLogFieldError("err", q.Err)) } func (o *testQueryObserver) GetMetrics(host *HostInfo) *hostMetrics { @@ -411,7 +415,7 @@ func TestQueryRetry(t *testing.T) { } func TestQueryMultinodeWithMetrics(t *testing.T) { - log := &testLogger{} + log := newTestLogger(LogLevelNone) defer func() { os.Stdout.WriteString(log.String()) }() @@ -439,7 +443,7 @@ func TestQueryMultinodeWithMetrics(t *testing.T) { // 1 retry per host rt := &SimpleRetryPolicy{NumRetries: 3} - observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), verbose: false, logger: log} + observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), logger: log} qry := db.Query("kill").RetryPolicy(rt).Observer(observer).Idempotent(true) iter := qry.Iter() err = iter.Close() @@ -488,7 +492,7 @@ func (t *testRetryPolicy) GetRetryType(err error) RetryType { } func TestSpeculativeExecution(t *testing.T) { - log := &testLogger{} + log := newTestLogger(LogLevelDebug) defer func() { os.Stdout.WriteString(log.String()) }() @@ -724,7 +728,7 @@ func TestStream0(t *testing.T) { session: &Session{ types: GlobalTypes, }, - logger: &defaultLogger{}, + logger: NewLogger(LogLevelNone), } err := conn.recv(context.Background(), false) diff --git a/connectionpool.go b/connectionpool.go index 9b8295e7..f316b569 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -154,7 +154,7 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) { Authenticator: cfg.Authenticator, AuthProvider: cfg.AuthProvider, Keepalive: cfg.SocketKeepalive, - Logger: cfg.logger(), + Logger: cfg.Logger, }, nil } @@ -310,7 +310,7 @@ type hostConnPool struct { filling bool pos uint32 - logger StdLogger + logger StructuredLogger } func (h *hostConnPool) String() string { @@ -493,21 +493,20 @@ func (pool *hostConnPool) logConnectErr(err error) { if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") { // connection refused // these are typical during a node outage so avoid log spam. - if gocqlDebug { - pool.logger.Printf("gocql: unable to dial %q: %v\n", pool.host, err) - } + pool.logger.Debug("Pool unable to establish a connection to host.", + newLogFieldIp("host_addr", pool.host.ConnectAddress()), newLogFieldString("host_id", pool.host.HostID()), newLogFieldError("err", err)) } else if err != nil { // unexpected error - pool.logger.Printf("error: failed to connect to %q due to error: %v", pool.host, err) + pool.logger.Debug("Pool failed to connect to host due to error.", + newLogFieldIp("host_addr", pool.host.ConnectAddress()), newLogFieldString("host_id", pool.host.HostID()), newLogFieldError("err", err)) } } // transition back to a not-filling state. func (pool *hostConnPool) fillingStopped(err error) { if err != nil { - if gocqlDebug { - pool.logger.Printf("gocql: filling stopped %q: %v\n", pool.host.ConnectAddress(), err) - } + pool.logger.Warning("Connection pool filling failed.", + newLogFieldIp("host_addr", pool.host.ConnectAddress()), newLogFieldString("host_id", pool.host.HostID()), newLogFieldError("err", err)) // wait for some time to avoid back-to-back filling // this provides some time between failed attempts // to fill the pool for the host to recover @@ -523,9 +522,8 @@ func (pool *hostConnPool) fillingStopped(err error) { // if we errored and the size is now zero, make sure the host is marked as down // see https://github.com/apache/cassandra-gocql-driver/issues/1614 - if gocqlDebug { - pool.logger.Printf("gocql: conns of pool after stopped %q: %v\n", host.ConnectAddress(), count) - } + pool.logger.Debug("Logging number of connections of pool after filling stopped.", + newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID()), newLogFieldInt("count", count)) if err != nil && count == 0 { if pool.session.cfg.ConvictionPolicy.AddFailure(err, host) { pool.session.handleNodeDown(host.ConnectAddress(), port) @@ -581,10 +579,11 @@ func (pool *hostConnPool) connect() (err error) { break } } - if gocqlDebug { - pool.logger.Printf("gocql: connection failed %q: %v, reconnecting with %T\n", - pool.host.ConnectAddress(), err, reconnectionPolicy) - } + pool.logger.Warning("Pool failed to connect to host. Reconnecting according to the reconnection policy.", + newLogFieldIp("host", pool.host.ConnectAddress()), + newLogFieldString("host_id", pool.host.HostID()), + newLogFieldError("err", err), + newLogFieldString("reconnectionPolicy", fmt.Sprintf("%T", reconnectionPolicy))) time.Sleep(reconnectionPolicy.GetInterval(i)) } @@ -631,9 +630,8 @@ func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) { return } - if gocqlDebug { - pool.logger.Printf("gocql: pool connection error %q: %v\n", conn.addr, err) - } + pool.logger.Info("Pool connection error.", + newLogFieldString("addr", conn.addr), newLogFieldError("err", err)) // find the connection index for i, candidate := range pool.conns { diff --git a/control.go b/control.go index dfc7dc02..c2cb4cb1 100644 --- a/control.go +++ b/control.go @@ -105,18 +105,20 @@ func (c *controlConn) heartBeat() { resp, err := c.writeFrame(&writeOptionsFrame{}) if err != nil { + c.session.logger.Debug("Control connection failed to send heartbeat.", newLogFieldError("err", err)) goto reconn } - switch resp.(type) { + switch actualResp := resp.(type) { case *supportedFrame: // Everything ok sleepTime = 5 * time.Second continue case error: + c.session.logger.Debug("Control connection heartbeat failed.", newLogFieldError("err", actualResp)) goto reconn default: - panic(fmt.Sprintf("gocql: unknown frame in response to options: %T", resp)) + c.session.logger.Error("Unknown frame in response to options.", newLogFieldString("frame_type", fmt.Sprintf("%T", resp))) } reconn: @@ -244,18 +246,25 @@ func (c *controlConn) discoverProtocol(hosts []*HostInfo) (int, error) { } if err == nil { + c.session.logger.Debug("Discovered protocol version using host.", + newLogFieldInt("protocol_version", connCfg.ProtoVersion), newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID())) return connCfg.ProtoVersion, nil } if proto := parseProtocolFromError(err); proto > 0 { + c.session.logger.Debug("Discovered protocol version using host after parsing protocol error.", + newLogFieldInt("protocol_version", proto), newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID())) return proto, nil } + + c.session.logger.Debug("Failed to discover protocol version using host.", + newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID()), newLogFieldError("err", err)) } return 0, err } -func (c *controlConn) connect(hosts []*HostInfo) error { +func (c *controlConn) connect(hosts []*HostInfo, sessionInit bool) error { if len(hosts) == 0 { return errors.New("control: no endpoints specified") } @@ -272,14 +281,22 @@ func (c *controlConn) connect(hosts []*HostInfo) error { for _, host := range hosts { conn, err = c.session.dial(c.session.ctx, host, &cfg, c) if err != nil { - c.session.logger.Printf("gocql: unable to dial control conn %v:%v: %v\n", host.ConnectAddress(), host.Port(), err) + c.session.logger.Info("Control connection failed to establish a connection to host.", + newLogFieldIp("host_addr", host.ConnectAddress()), + newLogFieldInt("port", host.Port()), + newLogFieldString("host_id", host.HostID()), + newLogFieldError("err", err)) continue } - err = c.setupConn(conn) + err = c.setupConn(conn, sessionInit) if err == nil { break } - c.session.logger.Printf("gocql: unable setup control conn %v:%v: %v\n", host.ConnectAddress(), host.Port(), err) + c.session.logger.Info("Control connection setup failed after connecting to host.", + newLogFieldIp("host_addr", host.ConnectAddress()), + newLogFieldInt("port", host.Port()), + newLogFieldString("host_id", host.HostID()), + newLogFieldError("err", err)) conn.Close() conn = nil } @@ -300,7 +317,7 @@ type connHost struct { host *HostInfo } -func (c *controlConn) setupConn(conn *Conn) error { +func (c *controlConn) setupConn(conn *Conn, sessionInit bool) error { // we need up-to-date host info for the filterHost call below iter := conn.querySystemLocal(context.TODO()) host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress, conn.r.RemoteAddr().(*net.TCPAddr).Port) @@ -308,10 +325,22 @@ func (c *controlConn) setupConn(conn *Conn) error { return err } - host = c.session.ring.addOrUpdate(host) + var exists bool + host, exists = c.session.ring.addOrUpdate(host) if c.session.cfg.filterHost(host) { - return fmt.Errorf("host was filtered: %v", host.ConnectAddress()) + return fmt.Errorf("host was filtered: %v (%s)", host.ConnectAddress(), host.HostID()) + } + + if !exists { + logLevel := LogLevelInfo + msg := "Added control host." + if sessionInit { + logLevel = LogLevelDebug + msg = "Added control host (session initialization)." + } + logHelper(c.session.logger, logLevel, msg, + newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID())) } if err := c.registerEvents(conn); err != nil { @@ -324,6 +353,10 @@ func (c *controlConn) setupConn(conn *Conn) error { } c.conn.Store(ch) + + c.session.logger.Info("Control connection connected to host.", + newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID())) + if c.session.initialized() { // We connected to control conn, so add the connect the host in pool as well. // Notify session we can start trying to connect to the node. @@ -365,7 +398,7 @@ func (c *controlConn) registerEvents(conn *Conn) error { if err != nil { return err } else if _, ok := frame.(*readyFrame); !ok { - return fmt.Errorf("unexpected frame in response to register: got %T: %v\n", frame, frame) + return fmt.Errorf("unexpected frame in response to register: got %T: %v", frame, frame) } return nil @@ -380,20 +413,25 @@ func (c *controlConn) reconnect() { } defer atomic.StoreInt32(&c.reconnecting, 0) - conn, err := c.attemptReconnect() + _, err := c.attemptReconnect() - if conn == nil { - c.session.logger.Printf("gocql: unable to reconnect control connection: %v\n", err) + if err != nil { + c.session.logger.Error("Unable to reconnect control connection.", + newLogFieldError("err", err)) return } err = c.session.refreshRing() if err != nil { - c.session.logger.Printf("gocql: unable to refresh ring: %v\n", err) + c.session.logger.Warning("Unable to refresh ring.", + newLogFieldError("err", err)) } } func (c *controlConn) attemptReconnect() (*Conn, error) { + + c.session.logger.Debug("Reconnecting the control connection.") + hosts := c.session.ring.allHosts() hosts = shuffleHosts(hosts) @@ -416,8 +454,7 @@ func (c *controlConn) attemptReconnect() (*Conn, error) { return conn, err } - c.session.logger.Printf("gocql: unable to connect to any ring node: %v\n", err) - c.session.logger.Printf("gocql: control falling back to initial contact points.\n") + c.session.logger.Error("Unable to connect to any ring node, control connection falling back to initial contact points.", newLogFieldError("err", err)) // Fallback to initial contact points, as it may be the case that all known initialHosts // changed their IPs while keeping the same hostname(s). initialHosts, resolvErr := addrsToHosts(c.session.cfg.Hosts, c.session.cfg.Port, c.session.logger) @@ -434,14 +471,22 @@ func (c *controlConn) attemptReconnectToAnyOfHosts(hosts []*HostInfo) (*Conn, er for _, host := range hosts { conn, err = c.session.connect(c.session.ctx, host, c) if err != nil { - c.session.logger.Printf("gocql: unable to dial control conn %v:%v: %v\n", host.ConnectAddress(), host.Port(), err) + c.session.logger.Info("During reconnection, control connection failed to establish a connection to host.", + newLogFieldIp("host_addr", host.ConnectAddress()), + newLogFieldInt("port", host.Port()), + newLogFieldString("host_id", host.HostID()), + newLogFieldError("err", err)) continue } - err = c.setupConn(conn) + err = c.setupConn(conn, false) if err == nil { break } - c.session.logger.Printf("gocql: unable setup control conn %v:%v: %v\n", host.ConnectAddress(), host.Port(), err) + c.session.logger.Info("During reconnection, control connection setup failed after connecting to host.", + newLogFieldIp("host_addr", host.ConnectAddress()), + newLogFieldInt("port", host.Port()), + newLogFieldString("host_id", host.HostID()), + newLogFieldError("err", err)) conn.Close() conn = nil } @@ -461,6 +506,11 @@ func (c *controlConn) HandleError(conn *Conn, err error, closed bool) { return } + c.session.logger.Warning("Control connection error.", + newLogFieldIp("host_addr", conn.host.ConnectAddress()), + newLogFieldString("host_id", conn.host.HostID()), + newLogFieldError("err", err)) + c.reconnect() } @@ -522,8 +572,9 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter return conn.executeQuery(qry.Context(), qry) }) - if gocqlDebug && iter.err != nil { - c.session.logger.Printf("control: error executing %q: %v\n", statement, iter.err) + if iter.err != nil { + c.session.logger.Warning("Error executing control connection statement.", + newLogFieldString("statement", statement), newLogFieldError("err", iter.err)) } iter.metrics.attempt(1, 0, c.getConn().host, false) diff --git a/control_test.go b/control_test.go index 9713718e..9f83ec95 100644 --- a/control_test.go +++ b/control_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/crc_test.go b/crc_test.go index cf5e40a3..2556f259 100644 --- a/crc_test.go +++ b/crc_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/debug_off.go b/debug_off.go deleted file mode 100644 index 75b0b0ce..00000000 --- a/debug_off.go +++ /dev/null @@ -1,30 +0,0 @@ -//go:build !gocql_debug -// +build !gocql_debug - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/* - * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40 - * Copyright (c) 2016, The Gocql authors, - * provided under the BSD-3-Clause License. - * See the NOTICE file distributed with this work for additional information. - */ - -package gocql - -const gocqlDebug = false diff --git a/debug_on.go b/debug_on.go deleted file mode 100644 index 424394c2..00000000 --- a/debug_on.go +++ /dev/null @@ -1,30 +0,0 @@ -//go:build gocql_debug -// +build gocql_debug - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/* - * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40 - * Copyright (c) 2016, The Gocql authors, - * provided under the BSD-3-Clause License. - * See the NOTICE file distributed with this work for additional information. - */ - -package gocql - -const gocqlDebug = true diff --git a/events.go b/events.go index 93b001ac..8f4bd1db 100644 --- a/events.go +++ b/events.go @@ -25,7 +25,10 @@ package gocql import ( + "fmt" "net" + "strconv" + "strings" "sync" "time" ) @@ -39,10 +42,10 @@ type eventDebouncer struct { callback func([]frame) quit chan struct{} - logger StdLogger + logger StructuredLogger } -func newEventDebouncer(name string, eventHandler func([]frame), logger StdLogger) *eventDebouncer { +func newEventDebouncer(name string, eventHandler func([]frame), logger StructuredLogger) *eventDebouncer { e := &eventDebouncer{ name: name, quit: make(chan struct{}), @@ -100,7 +103,8 @@ func (e *eventDebouncer) debounce(frame frame) { if len(e.events) < eventBufferSize { e.events = append(e.events, frame) } else { - e.logger.Printf("%s: buffer full, dropping event frame: %s", e.name, frame) + e.logger.Warning("Event buffer full, dropping event frame.", + newLogFieldString("event_name", e.name), newLogFieldStringer("frame", frame)) } e.mu.Unlock() @@ -109,13 +113,11 @@ func (e *eventDebouncer) debounce(frame frame) { func (s *Session) handleEvent(framer *framer) { frame, err := framer.parseFrame() if err != nil { - s.logger.Printf("gocql: unable to parse event frame: %v\n", err) + s.logger.Error("Unable to parse event frame.", newLogFieldError("err", err)) return } - if gocqlDebug { - s.logger.Printf("gocql: handling frame: %v\n", frame) - } + s.logger.Debug("Handling event frame.", newLogFieldStringer("frame", frame)) switch f := frame.(type) { case *schemaChangeKeyspace, *schemaChangeFunction, @@ -125,7 +127,8 @@ func (s *Session) handleEvent(framer *framer) { case *topologyChangeEventFrame, *statusChangeEventFrame: s.nodeEvents.debounce(frame) default: - s.logger.Printf("gocql: invalid event frame (%T): %v\n", f, f) + s.logger.Error("Invalid event frame.", + newLogFieldString("frame_type", fmt.Sprintf("%T", f)), newLogFieldStringer("frame", f)) } } @@ -177,6 +180,8 @@ func (s *Session) handleNodeEvent(frames []frame) { for _, frame := range frames { switch f := frame.(type) { case *topologyChangeEventFrame: + s.logger.Info("Received topology change event.", + newLogFieldString("frame", strings.Join([]string{f.change, "->", f.host.String(), ":", strconv.Itoa(f.port)}, ""))) topologyEventReceived = true case *statusChangeEventFrame: event, ok := sEvents[f.host.String()] @@ -193,9 +198,8 @@ func (s *Session) handleNodeEvent(frames []frame) { } for _, f := range sEvents { - if gocqlDebug { - s.logger.Printf("gocql: dispatching status change event: %+v\n", f) - } + s.logger.Info("Dispatching status change event.", + newLogFieldString("frame", strings.Join([]string{f.change, "->", f.host.String(), ":", strconv.Itoa(f.port)}, ""))) // ignore events we received if they were disabled // see https://github.com/apache/cassandra-gocql-driver/issues/1591 @@ -213,9 +217,8 @@ func (s *Session) handleNodeEvent(frames []frame) { } func (s *Session) handleNodeUp(eventIp net.IP, eventPort int) { - if gocqlDebug { - s.logger.Printf("gocql: Session.handleNodeUp: %s:%d\n", eventIp.String(), eventPort) - } + s.logger.Info("Node is UP.", + newLogFieldStringer("event_ip", eventIp), newLogFieldInt("event_port", eventPort)) host, ok := s.ring.getHostByIP(eventIp.String()) if !ok { @@ -240,9 +243,8 @@ func (s *Session) startPoolFill(host *HostInfo) { } func (s *Session) handleNodeConnected(host *HostInfo) { - if gocqlDebug { - s.logger.Printf("gocql: Session.handleNodeConnected: %s:%d\n", host.ConnectAddress(), host.Port()) - } + s.logger.Debug("Pool connected to node.", + newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldInt("port", host.Port()), newLogFieldString("host_id", host.HostID())) host.setState(NodeUp) @@ -252,9 +254,8 @@ func (s *Session) handleNodeConnected(host *HostInfo) { } func (s *Session) handleNodeDown(ip net.IP, port int) { - if gocqlDebug { - s.logger.Printf("gocql: Session.handleNodeDown: %s:%d\n", ip.String(), port) - } + s.logger.Warning("Node is DOWN.", + newLogFieldIp("host_addr", ip), newLogFieldInt("port", port)) host, ok := s.ring.getHostByIP(ip.String()) if ok { diff --git a/events_test.go b/events_test.go index 537c5188..cf088e3d 100644 --- a/events_test.go +++ b/events_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/filters.go b/filters.go index 312bd0d1..c258dadd 100644 --- a/filters.go +++ b/filters.go @@ -70,7 +70,7 @@ func DataCentreHostFilter(dataCenter string) HostFilter { // WhiteListHostFilter filters incoming hosts by checking that their address is // in the initial hosts whitelist. func WhiteListHostFilter(hosts ...string) HostFilter { - hostInfos, err := addrsToHosts(hosts, 9042, nopLogger{}) + hostInfos, err := addrsToHosts(hosts, 9042, nopLoggerSingleton) if err != nil { // dont want to panic here, but rather not break the API panic(fmt.Errorf("unable to lookup host info from address: %v", err)) diff --git a/filters_test.go b/filters_test.go index a1abec20..2469422b 100644 --- a/filters_test.go +++ b/filters_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/frame.go b/frame.go index a48279d3..93af2c67 100644 --- a/frame.go +++ b/frame.go @@ -411,6 +411,7 @@ func newFramer(compressor Compressor, version byte, r *RegisteredTypes) *framer type frame interface { Header() frameHeader + String() string } func readHeader(r io.Reader, p []byte) (head frameHeader, err error) { diff --git a/frame_test.go b/frame_test.go index e2270d2c..e7a8bb4b 100644 --- a/frame_test.go +++ b/frame_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/go.mod b/go.mod index c4505dc4..93a5ca65 100644 --- a/go.mod +++ b/go.mod @@ -21,14 +21,16 @@ require ( github.com/golang/snappy v0.0.3 github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed github.com/pierrec/lz4/v4 v4.1.8 + github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.9.0 + go.uber.org/zap v1.27.0 gopkg.in/inf.v0 v0.9.1 ) require ( github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect - github.com/kr/pretty v0.1.0 // indirect + github.com/kr/pretty v0.3.1 // indirect ) go 1.13 diff --git a/go.sum b/go.sum index 14c301fe..b0a2adec 100644 --- a/go.sum +++ b/go.sum @@ -2,33 +2,63 @@ github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYE github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4= github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= +github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/gocqlzap/zap.go b/gocqlzap/zap.go new file mode 100644 index 00000000..0b7dbfcd --- /dev/null +++ b/gocqlzap/zap.go @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gocqlzap + +import ( + "go.uber.org/zap" + + "github.com/gocql/gocql" +) + +const DefaultName = "gocql" + +type Logger interface { + gocql.StructuredLogger + ZapLogger() *zap.Logger +} + +type logger struct { + zapLogger *zap.Logger +} + +// NewZapLogger creates a new zap based logger with the logger name set to DefaultName +func NewZapLogger(l *zap.Logger) Logger { + return &logger{zapLogger: l.Named(DefaultName)} +} + +// NewUnnamedZapLogger doesn't set the logger name so the user can set the name of the logger +// before providing it to this function (or just leave it unset) +func NewUnnamedZapLogger(l *zap.Logger) Logger { + return &logger{zapLogger: l} +} + +func (rec *logger) ZapLogger() *zap.Logger { + return rec.zapLogger +} + +func (rec *logger) log(fields []gocql.LogField) *zap.Logger { + childLogger := rec.zapLogger + for _, field := range fields { + childLogger = childLogger.WithLazy(zapField(field)) + } + return childLogger +} + +func zapField(field gocql.LogField) zap.Field { + switch field.Value.LogFieldValueType() { + case gocql.LogFieldTypeBool: + return zap.Bool(field.Name, field.Value.Bool()) + case gocql.LogFieldTypeInt64: + return zap.Int64(field.Name, field.Value.Int64()) + case gocql.LogFieldTypeString: + return zap.String(field.Name, field.Value.String()) + default: + return zap.Any(field.Name, field.Value.Any()) + } +} + +func (rec *logger) Error(msg string, fields ...gocql.LogField) { + rec.log(fields).Error(msg) +} + +func (rec *logger) Warning(msg string, fields ...gocql.LogField) { + rec.log(fields).Warn(msg) +} + +func (rec *logger) Info(msg string, fields ...gocql.LogField) { + rec.log(fields).Info(msg) +} + +func (rec *logger) Debug(msg string, fields ...gocql.LogField) { + rec.log(fields).Debug(msg) +} diff --git a/gocqlzap/zap_test.go b/gocqlzap/zap_test.go new file mode 100644 index 00000000..7ac0fb6d --- /dev/null +++ b/gocqlzap/zap_test.go @@ -0,0 +1,80 @@ +//go:build all || unit +// +build all unit + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gocqlzap + +import ( + "bytes" + "io" + "strings" + "testing" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/gocql/gocql" +) + +const logLineEnding = "%%%\n%%%" + +func NewCustomLogger(pipeTo io.Writer) zapcore.Core { + cfg := zap.NewProductionEncoderConfig() + cfg.LineEnding = logLineEnding + return zapcore.NewCore( + zapcore.NewConsoleEncoder(cfg), + zapcore.AddSync(pipeTo), + zapcore.DebugLevel, + ) +} + +func TestGocqlZapLog(t *testing.T) { + b := &bytes.Buffer{} + logger := zap.New(NewCustomLogger(b)) + clusterCfg := gocql.NewCluster("0.0.0.1") + clusterCfg.Logger = NewZapLogger(logger) + clusterCfg.ProtoVersion = 4 + session, err := clusterCfg.CreateSession() + if err == nil { + session.Close() + t.Fatal("expected error creating session") + } + err = logger.Sync() + if err != nil { + t.Fatal("logger sync failed") + } + logOutput := strings.Split(b.String(), logLineEnding) + found := false + for _, logEntry := range logOutput { + if len(logEntry) == 0 { + continue + } + if !strings.Contains(logEntry, "info\tgocql\tControl connection failed to establish a connection to host.\t{\"host_addr\": "+ + "\"0.0.0.1\", \"port\": 9042, \"host_id\": \"\", \"err\": \"dial tcp 0.0.0.1:9042:") { + continue + } else { + found = true + break + } + } + if !found { + t.Fatal("log output didn't match expectations: ", strings.Join(logOutput, "\n")) + } +} diff --git a/gocqlzerolog/zerolog.go b/gocqlzerolog/zerolog.go new file mode 100644 index 00000000..ee2e7021 --- /dev/null +++ b/gocqlzerolog/zerolog.go @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gocqlzerolog + +import ( + "github.com/rs/zerolog" + + "github.com/gocql/gocql" +) + +const DefaultName = "gocql" +const DefaultNameField = "logger" + +type Logger interface { + gocql.StructuredLogger + ZerologLogger() zerolog.Logger +} + +type logger struct { + zerologLogger zerolog.Logger +} + +// NewZerologLogger creates a new zerolog based logger with a global context containing a field +// with name "logger" and value "gocql", i.e.: +// +// l.With().Str("logger", "gocql").Logger() +func NewZerologLogger(l zerolog.Logger) Logger { + return &logger{zerologLogger: l.With().Str(DefaultNameField, DefaultName).Logger()} +} + +// NewUnnamedZerologLogger creates a new zerolog based logger without modifying its context like +// NewZerologLogger does. +func NewUnnamedZerologLogger(l zerolog.Logger) Logger { + return &logger{zerologLogger: l} +} + +func (rec *logger) ZerologLogger() zerolog.Logger { + return rec.zerologLogger +} + +func (rec *logger) log(event *zerolog.Event, fields ...gocql.LogField) *zerolog.Event { + for _, field := range fields { + event = zerologEvent(event, field) + } + return event +} + +func zerologEvent(event *zerolog.Event, field gocql.LogField) *zerolog.Event { + switch field.Value.LogFieldValueType() { + case gocql.LogFieldTypeBool: + return event.Bool(field.Name, field.Value.Bool()) + case gocql.LogFieldTypeInt64: + return event.Int64(field.Name, field.Value.Int64()) + case gocql.LogFieldTypeString: + return event.Str(field.Name, field.Value.String()) + default: + return event.Any(field.Name, field.Value.Any()) + } +} + +func (rec *logger) Error(msg string, fields ...gocql.LogField) { + rec.log(rec.zerologLogger.Error(), fields...).Msg(msg) +} + +func (rec *logger) Warning(msg string, fields ...gocql.LogField) { + rec.log(rec.zerologLogger.Warn(), fields...).Msg(msg) +} + +func (rec *logger) Info(msg string, fields ...gocql.LogField) { + rec.log(rec.zerologLogger.Info(), fields...).Msg(msg) +} + +func (rec *logger) Debug(msg string, fields ...gocql.LogField) { + rec.log(rec.zerologLogger.Debug(), fields...).Msg(msg) +} diff --git a/gocqlzerolog/zerolog_test.go b/gocqlzerolog/zerolog_test.go new file mode 100644 index 00000000..66c54961 --- /dev/null +++ b/gocqlzerolog/zerolog_test.go @@ -0,0 +1,70 @@ +//go:build all || unit +// +build all unit + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gocqlzerolog + +import ( + "bytes" + "strings" + "testing" + + "github.com/rs/zerolog" + + "github.com/gocql/gocql" +) + +const logLineEnding = "%%%\n%%%" + +func TestGocqlZeroLog(t *testing.T) { + b := &bytes.Buffer{} + output := zerolog.ConsoleWriter{Out: b} + output.NoColor = true + output.FormatExtra = func(m map[string]interface{}, buffer *bytes.Buffer) error { + buffer.WriteString(logLineEnding) + return nil + } + logger := zerolog.New(output).Level(zerolog.DebugLevel) + clusterCfg := gocql.NewCluster("0.0.0.1") + clusterCfg.Logger = NewZerologLogger(logger) + clusterCfg.ProtoVersion = 4 + session, err := clusterCfg.CreateSession() + if err == nil { + session.Close() + t.Fatal("expected error creating session") + } + logOutput := strings.Split(b.String(), logLineEnding+"\n") + found := false + for _, logEntry := range logOutput { + if len(logEntry) == 0 { + continue + } + if !strings.Contains(logEntry, "Control connection failed to establish a connection to host.") || + !strings.Contains(logEntry, "host_addr=0.0.0.1 host_id= logger=gocql port=9042") { + continue + } else { + found = true + break + } + } + if !found { + t.Fatal("log output didn't match expectations: ", strings.Join(logOutput, "\n")) + } +} diff --git a/helpers.go b/helpers.go index a391a11e..baae5ce0 100644 --- a/helpers.go +++ b/helpers.go @@ -25,6 +25,7 @@ package gocql import ( + "bytes" "fmt" "net" "reflect" @@ -202,3 +203,11 @@ func LookupIP(host string) ([]net.IP, error) { return net.LookupIP(host) } + +func ringString(hosts []*HostInfo) string { + buf := new(bytes.Buffer) + for _, h := range hosts { + buf.WriteString("[" + h.ConnectAddress().String() + "-" + h.HostID() + ":" + h.State().String() + "]") + } + return buf.String() +} diff --git a/helpers_test.go b/helpers_test.go index 45903a69..bcd727c5 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/host_source.go b/host_source.go index ece5d3e5..7e88cf9d 100644 --- a/host_source.go +++ b/host_source.go @@ -610,7 +610,7 @@ func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (* // Not sure what the port field will be called until the JIRA issue is complete } - ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port) + ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port, s.logger) if !validIpAddr(ip) { return nil, fmt.Errorf("invalid host address (before translation: %v:%v, after translation: %v:%v)", host.ConnectAddress(), host.port, ip.String(), port) } @@ -687,8 +687,8 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er return nil, err } else if !isValidPeer(host) { // If it's not a valid peer - r.session.logger.Printf("Found invalid peer '%s' "+ - "Likely due to a gossip or snitch issue, this host will be ignored", host) + r.session.logger.Warning("Found invalid peer "+ + "likely due to a gossip or snitch issue, this host will be ignored.", newLogFieldStringer("host", host)) continue } @@ -760,6 +760,7 @@ func refreshRing(r *ringDescriber) error { } if host, ok := r.session.ring.addHostIfMissing(h); !ok { + r.session.logger.Info("Adding host.", newLogFieldIp("host_addr", h.ConnectAddress()), newLogFieldString("host_id", h.HostID())) r.session.startPoolFill(h) } else { // host (by hostID) already exists; determine if IP has changed @@ -778,6 +779,7 @@ func refreshRing(r *ringDescriber) error { if _, alreadyExists := r.session.ring.addHostIfMissing(h); alreadyExists { return fmt.Errorf("add new host=%s after removal: %w", h, ErrHostAlreadyExists) } + r.session.logger.Info("Adding host with new IP after removing old host.", newLogFieldIp("host_addr", h.ConnectAddress()), newLogFieldString("host_id", h.HostID())) // add new HostInfo (same hostID, new IP) r.session.startPoolFill(h) } @@ -791,6 +793,7 @@ func refreshRing(r *ringDescriber) error { r.session.metadata.setPartitioner(partitioner) r.session.policy.SetPartitioner(partitioner) + r.session.logger.Info("Refreshed ring.", newLogFieldString("ring", ringString(r.session.ring.allHosts()))) return nil } diff --git a/hostpool/hostpool.go b/hostpool/hostpool.go index f3a3d0f6..c821a1c6 100644 --- a/hostpool/hostpool.go +++ b/hostpool/hostpool.go @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package hostpool import ( diff --git a/hostpool/hostpool_test.go b/hostpool/hostpool_test.go index 10064339..75e3cc56 100644 --- a/hostpool/hostpool_test.go +++ b/hostpool/hostpool_test.go @@ -1,6 +1,24 @@ //go:build all || unit // +build all unit +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package hostpool import ( diff --git a/logger.go b/logger.go index 1520111f..e01bf684 100644 --- a/logger.go +++ b/logger.go @@ -28,33 +28,379 @@ import ( "bytes" "fmt" "log" + "net" + "strconv" + "strings" + "sync" ) -type StdLogger interface { - Print(v ...interface{}) - Printf(format string, v ...interface{}) - Println(v ...interface{}) +// Deprecated: use StructuredLogger instead +type StdLogger interface{} + +func logHelper(logger StructuredLogger, level LogLevel, msg string, fields ...LogField) { + switch level { + case LogLevelDebug: + logger.Debug(msg, fields...) + case LogLevelInfo: + logger.Info(msg, fields...) + case LogLevelWarn: + logger.Warning(msg, fields...) + case LogLevelError: + logger.Error(msg, fields...) + default: + logger.Error("Unknown log level", newLogFieldInt("level", int(level)), newLogFieldString("msg", msg)) + } } type nopLogger struct{} -func (n nopLogger) Print(_ ...interface{}) {} +func (n nopLogger) Error(_ string, _ ...LogField) {} + +func (n nopLogger) Warning(_ string, _ ...LogField) {} + +func (n nopLogger) Info(_ string, _ ...LogField) {} -func (n nopLogger) Printf(_ string, _ ...interface{}) {} +func (n nopLogger) Debug(_ string, _ ...LogField) {} -func (n nopLogger) Println(_ ...interface{}) {} +var nopLoggerSingleton = &nopLogger{} type testLogger struct { - capture bytes.Buffer + logLevel LogLevel + capture bytes.Buffer + mu sync.Mutex +} + +func newTestLogger(logLevel LogLevel) *testLogger { + return &testLogger{logLevel: logLevel} +} + +func (l *testLogger) Error(msg string, fields ...LogField) { + if LogLevelError <= l.logLevel { + l.write("ERR gocql: ", msg, fields) + } +} + +func (l *testLogger) Warning(msg string, fields ...LogField) { + if LogLevelWarn <= l.logLevel { + l.write("WRN gocql: ", msg, fields) + } +} + +func (l *testLogger) Info(msg string, fields ...LogField) { + if LogLevelInfo <= l.logLevel { + l.write("INF gocql: ", msg, fields) + } +} + +func (l *testLogger) Debug(msg string, fields ...LogField) { + if LogLevelDebug <= l.logLevel { + l.write("DBG gocql: ", msg, fields) + } +} + +func (l *testLogger) write(prefix string, msg string, fields []LogField) { + buf := bytes.Buffer{} + writeLogMsg(&buf, prefix, msg, fields) + l.mu.Lock() + defer l.mu.Unlock() + l.capture.WriteString(buf.String() + "\n") +} + +func (l *testLogger) String() string { + l.mu.Lock() + defer l.mu.Unlock() + return l.capture.String() +} + +type defaultLogger struct { + logLevel LogLevel +} + +// NewLogger creates a StructuredLogger that uses the standard library log package. +// +// This logger will write log messages in the following format: +// +// <LOG_LEVEL> gocql: <message> <fields[0].Name>=<fields[0].Value> <fields[1].Name>=<fields[1].Value> +// +// LOG_LEVEL is always a 3 letter string: +// - DEBUG -> DBG +// - INFO -> INF +// - WARNING -> WRN +// - ERROR -> ERR +// +// Example: +// +// INF gocql: Adding host (session initialization). host_addr=127.0.0.1 host_id=a21dd06e-9e7e-4528-8ad7-039604e25e73 +func NewLogger(logLevel LogLevel) StructuredLogger { + return &defaultLogger{logLevel: logLevel} +} + +func (l *defaultLogger) Error(msg string, fields ...LogField) { + if LogLevelError <= l.logLevel { + l.write("ERR gocql: ", msg, fields) + } +} + +func (l *defaultLogger) Warning(msg string, fields ...LogField) { + if LogLevelWarn <= l.logLevel { + l.write("WRN gocql: ", msg, fields) + } +} + +func (l *defaultLogger) Info(msg string, fields ...LogField) { + if LogLevelInfo <= l.logLevel { + l.write("INF gocql: ", msg, fields) + } +} + +func (l *defaultLogger) Debug(msg string, fields ...LogField) { + if LogLevelDebug <= l.logLevel { + l.write("DBG gocql: ", msg, fields) + } +} + +func (l *defaultLogger) write(prefix string, msg string, fields []LogField) { + buf := bytes.Buffer{} + writeLogMsg(&buf, prefix, msg, fields) + log.Println(buf.String()) +} + +func writeFields(buf *bytes.Buffer, fields []LogField) { + for i, field := range fields { + if i > 0 { + buf.WriteRune(' ') + } + buf.WriteString(field.Name) + buf.WriteRune('=') + buf.WriteString(field.Value.String()) + } +} + +func writeLogMsg(buf *bytes.Buffer, prefix string, msg string, fields []LogField) { + buf.WriteString(prefix) + buf.WriteString(msg) + buf.WriteRune(' ') + writeFields(buf, fields) +} + +type LogLevel int + +const ( + LogLevelDebug = LogLevel(5) + LogLevelInfo = LogLevel(4) + LogLevelWarn = LogLevel(3) + LogLevelError = LogLevel(2) + LogLevelNone = LogLevel(0) +) + +func (recv LogLevel) String() string { + switch recv { + case LogLevelDebug: + return "debug" + case LogLevelInfo: + return "info" + case LogLevelWarn: + return "warn" + case LogLevelError: + return "error" + case LogLevelNone: + return "none" + default: + // fmt.sprintf allocates so use strings.Join instead + temp := [2]string{"invalid level ", strconv.Itoa(int(recv))} + return strings.Join(temp[:], "") + } +} + +type LogField struct { + Name string + Value LogFieldValue +} + +func newLogField(name string, value LogFieldValue) LogField { + return LogField{ + Name: name, + Value: value, + } } -func (l *testLogger) Print(v ...interface{}) { fmt.Fprint(&l.capture, v...) } -func (l *testLogger) Printf(format string, v ...interface{}) { fmt.Fprintf(&l.capture, format, v...) } -func (l *testLogger) Println(v ...interface{}) { fmt.Fprintln(&l.capture, v...) } -func (l *testLogger) String() string { return l.capture.String() } +func newLogFieldIp(name string, value net.IP) LogField { + var str string + if value == nil { + str = "<nil>" + } else { + str = value.String() + } + return newLogField(name, logFieldValueString(str)) +} + +func newLogFieldError(name string, value error) LogField { + var str string + if value != nil { + str = value.Error() + } + return newLogField(name, logFieldValueString(str)) +} + +func newLogFieldStringer(name string, value fmt.Stringer) LogField { + var str string + if value != nil { + str = value.String() + } + return newLogField(name, logFieldValueString(str)) +} + +func newLogFieldString(name string, value string) LogField { + return newLogField(name, logFieldValueString(value)) +} + +func newLogFieldInt(name string, value int) LogField { + return newLogField(name, logFieldValueInt64(int64(value))) +} + +func newLogFieldBool(name string, value bool) LogField { + return newLogField(name, logFieldValueBool(value)) +} + +type StructuredLogger interface { + Error(msg string, fields ...LogField) + Warning(msg string, fields ...LogField) + Info(msg string, fields ...LogField) + Debug(msg string, fields ...LogField) +} + +// A LogFieldValue can represent any Go value, but unlike type any, +// it can represent most small values without an allocation. +// The zero Value corresponds to nil. +type LogFieldValue struct { + num uint64 + any interface{} +} + +// LogFieldValueType is the type of a LogFieldValue. +type LogFieldValueType int + +// It's important that LogFieldTypeAny is 0 so that a zero Value represents nil. +const ( + LogFieldTypeAny LogFieldValueType = iota + LogFieldTypeBool + LogFieldTypeInt64 + LogFieldTypeString +) + +// LogFieldValueType returns v's LogFieldValueType. +func (v LogFieldValue) LogFieldValueType() LogFieldValueType { + switch x := v.any.(type) { + case LogFieldValueType: + return x + case string: + return LogFieldTypeString + default: + return LogFieldTypeAny + } +} + +func logFieldValueString(value string) LogFieldValue { + return LogFieldValue{any: value} +} -type defaultLogger struct{} +func logFieldValueInt(v int) LogFieldValue { + return logFieldValueInt64(int64(v)) +} + +func logFieldValueInt64(v int64) LogFieldValue { + return LogFieldValue{num: uint64(v), any: LogFieldTypeInt64} +} + +func logFieldValueBool(v bool) LogFieldValue { + u := uint64(0) + if v { + u = 1 + } + return LogFieldValue{num: u, any: LogFieldTypeBool} +} + +// Any returns v's value as an interface. +func (v LogFieldValue) Any() interface{} { + switch v.LogFieldValueType() { + case LogFieldTypeAny: + if k, ok := v.any.(LogFieldValueType); ok { + return k + } + return v.any + case LogFieldTypeInt64: + return int64(v.num) + case LogFieldTypeString: + return v.str() + case LogFieldTypeBool: + return v.bool() + default: + panic(fmt.Sprintf("bad value type: %s", v.LogFieldValueType())) + } +} + +// String returns LogFieldValue's value as a string, formatted like fmt.Sprint. +// +// Unlike the methods Int64 and Bool which panic if v is of the +// wrong LogFieldValueType, String never panics +// (i.e. it can be called for any LogFieldValueType, not just LogFieldTypeString) +func (v LogFieldValue) String() string { + return v.stringValue() +} + +func (v LogFieldValue) str() string { + return v.any.(string) +} -func (l *defaultLogger) Print(v ...interface{}) { log.Print(v...) } -func (l *defaultLogger) Printf(format string, v ...interface{}) { log.Printf(format, v...) } -func (l *defaultLogger) Println(v ...interface{}) { log.Println(v...) } +// Int64 returns v's value as an int64. It panics +// if v is not a signed integer. +func (v LogFieldValue) Int64() int64 { + if g, w := v.LogFieldValueType(), LogFieldTypeInt64; g != w { + panic(fmt.Sprintf("Value type is %s, not %s", g, w)) + } + return int64(v.num) +} + +// Bool returns v's value as a bool. It panics +// if v is not a bool. +func (v LogFieldValue) Bool() bool { + if g, w := v.LogFieldValueType(), LogFieldTypeBool; g != w { + panic(fmt.Sprintf("Value type is %s, not %s", g, w)) + } + return v.bool() +} + +func (v LogFieldValue) bool() bool { + return v.num == 1 +} + +// stringValue returns a text representation of v. +// v is formatted as with fmt.Sprint. +func (v LogFieldValue) stringValue() string { + switch v.LogFieldValueType() { + case LogFieldTypeString: + return v.str() + case LogFieldTypeInt64: + return strconv.FormatInt(int64(v.num), 10) + case LogFieldTypeBool: + return strconv.FormatBool(v.bool()) + case LogFieldTypeAny: + return fmt.Sprint(v.any) + default: + panic(fmt.Sprintf("bad value type: %s", v.LogFieldValueType())) + } +} + +var logFieldValueTypeStrings = []string{ + "Any", + "Bool", + "Int64", + "String", +} + +func (t LogFieldValueType) String() string { + if t >= 0 && int(t) < len(logFieldValueTypeStrings) { + return logFieldValueTypeStrings[t] + } + return "<unknown gocql.LogFieldValueType>" +} diff --git a/metadata_test.go b/metadata_test.go index b6a7a88f..e2af014e 100644 --- a/metadata_test.go +++ b/metadata_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + // Copyright (c) 2015 The gocql Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -40,7 +43,7 @@ func TestCompileMetadata(t *testing.T) { cfg: ClusterConfig{ ProtoVersion: 1, }, - logger: &defaultLogger{}, + logger: NewLogger(LogLevelNone), types: GlobalTypes, } // V2 test - V2+ protocol is simpler so here are some toy examples to verify that the mapping works @@ -431,7 +434,7 @@ func assertParseNonCompositeType( cfg: ClusterConfig{ ProtoVersion: 4, }, - logger: &defaultLogger{}, + logger: NewLogger(LogLevelNone), types: GlobalTypes, } result, err := parseType(session, def) @@ -472,7 +475,7 @@ func assertParseCompositeType( cfg: ClusterConfig{ ProtoVersion: 4, }, - logger: &defaultLogger{}, + logger: NewLogger(LogLevelNone), types: GlobalTypes, } result, err := parseType(session, def) diff --git a/policies.go b/policies.go index c9f3399f..8db55bf2 100644 --- a/policies.go +++ b/policies.go @@ -417,7 +417,7 @@ type tokenAwareHostPolicy struct { partitioner string metadata atomic.Value // *clusterMeta - logger StdLogger + logger StructuredLogger } func (t *tokenAwareHostPolicy) Init(s *Session) { @@ -560,7 +560,7 @@ func (t *tokenAwareHostPolicy) getMetadataForUpdate() *clusterMeta { // resetTokenRing creates a new tokenRing. // It must be called with t.mu locked. -func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo, logger StdLogger) { +func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo, logger StructuredLogger) { if partitioner == "" { // partitioner not yet set return @@ -569,7 +569,7 @@ func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo, logg // create a new token ring tokenRing, err := newTokenRing(partitioner, hosts) if err != nil { - logger.Printf("Unable to update the token ring due to error: %s", err) + logger.Warning("Unable to update the token ring due to error.", newLogFieldError("err", err)) return } diff --git a/policies_test.go b/policies_test.go index 540742a0..ab40a62a 100644 --- a/policies_test.go +++ b/policies_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + // Copyright (c) 2015 The gocql Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/ring.go b/ring.go index 1dbf07c4..d1116226 100644 --- a/ring.go +++ b/ring.go @@ -94,12 +94,12 @@ func (r *ring) currentHosts() map[string]*HostInfo { return hosts } -func (r *ring) addOrUpdate(host *HostInfo) *HostInfo { - if existingHost, ok := r.addHostIfMissing(host); ok { +func (r *ring) addOrUpdate(host *HostInfo) (*HostInfo, bool) { + existingHost, ok := r.addHostIfMissing(host) + if ok { existingHost.update(host) - host = existingHost } - return host + return existingHost, ok } func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) { diff --git a/ring_test.go b/ring_test.go index 3e9533ec..8d1c094e 100644 --- a/ring_test.go +++ b/ring_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/session.go b/session.go index dcd0b620..45206cd0 100644 --- a/session.go +++ b/session.go @@ -101,17 +101,17 @@ type Session struct { // you can use initialized() to read the value. isInitialized bool - logger StdLogger + logger StructuredLogger } -func addrsToHosts(addrs []string, defaultPort int, logger StdLogger) ([]*HostInfo, error) { +func addrsToHosts(addrs []string, defaultPort int, logger StructuredLogger) ([]*HostInfo, error) { var hosts []*HostInfo for _, hostaddr := range addrs { resolvedHosts, err := hostInfo(hostaddr, defaultPort) if err != nil { // Try other hosts if unable to resolve DNS name if _, ok := err.(*net.DNSError); ok { - logger.Printf("gocql: dns error: %v\n", err) + logger.Error("DNS error.", newLogFieldError("err", err)) continue } return nil, err @@ -153,7 +153,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) { connectObserver: cfg.ConnectObserver, ctx: ctx, cancel: cancel, - logger: cfg.logger(), + logger: cfg.newLogger(), trace: cfg.Tracer, } if cfg.RegisteredTypes == nil { @@ -234,9 +234,10 @@ func (s *Session) init() error { // TODO(zariel): we really only need this in 1 place s.cfg.ProtoVersion = proto s.connCfg.ProtoVersion = proto + s.logger.Info("Discovered protocol version.", newLogFieldInt("protocol_version", proto)) } - if err := s.control.connect(hosts); err != nil { + if err := s.control.connect(hosts, true); err != nil { return err } @@ -255,6 +256,9 @@ func (s *Session) init() error { } hosts = filteredHosts + s.logger.Info("Refreshed ring.", newLogFieldString("ring", ringString(hosts))) + } else { + s.logger.Info("Not performing a ring refresh because DisableInitialHostLookup is true.") } } @@ -285,10 +289,14 @@ func (s *Session) init() error { // again atomic.AddInt64(&left, 1) for _, host := range hostMap { - host := s.ring.addOrUpdate(host) + host, exists := s.ring.addOrUpdate(host) if s.cfg.filterHost(host) { continue } + if !exists { + s.logger.Info("Adding host (session initialization).", + newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID())) + } atomic.AddInt64(&left, 1) go func() { @@ -366,6 +374,7 @@ func (s *Session) init() error { s.isInitialized = true s.sessionStateMu.Unlock() + s.logger.Info("Session initialized successfully.") return nil } @@ -391,21 +400,20 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) { for { select { case <-reconnectTicker.C: + s.logger.Debug("Connecting to downed hosts if there is any.") hosts := s.ring.allHosts() // Print session.ring for debug. - if gocqlDebug { - buf := bytes.NewBufferString("Session.ring:") - for _, h := range hosts { - buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]") - } - s.logger.Println(buf.String()) - } + s.logger.Debug("Logging current ring state.", newLogFieldString("ring", ringString(hosts))) for _, h := range hosts { if h.IsUp() { continue } + s.logger.Debug("Reconnecting to downed host.", + newLogFieldIp("host_addr", h.ConnectAddress()), + newLogFieldInt("host_port", h.Port()), + newLogFieldString("host_id", h.HostID())) // we let the pool call handleNodeConnected to change the host state s.pool.addHost(h) } @@ -524,6 +532,7 @@ func (s *Session) executeQuery(qry *internalQuery) (it *Iter) { } func (s *Session) removeHost(h *HostInfo) { + s.logger.Warning("Removing host.", newLogFieldIp("host_addr", h.ConnectAddress()), newLogFieldString("host_id", h.HostID())) s.policy.RemoveHost(h) hostID := h.HostID() s.pool.removeHost(hostID) diff --git a/session_connect_test.go b/session_connect_test.go index d2097b63..83214b5e 100644 --- a/session_connect_test.go +++ b/session_connect_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/session_test.go b/session_test.go index d414d6f4..3fcb2163 100644 --- a/session_test.go +++ b/session_test.go @@ -40,7 +40,7 @@ func TestSessionAPI(t *testing.T) { cfg: *cfg, cons: Quorum, policy: RoundRobinHostPolicy(), - logger: cfg.logger(), + logger: cfg.newLogger(), } defer s.Close() @@ -168,7 +168,7 @@ func TestBatchBasicAPI(t *testing.T) { s := &Session{ cfg: *cfg, cons: Quorum, - logger: cfg.logger(), + logger: cfg.newLogger(), } defer s.Close() diff --git a/snappy/compressor_test.go b/snappy/compressor_test.go index 3efe3fa7..b87eb5bf 100644 --- a/snappy/compressor_test.go +++ b/snappy/compressor_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/topology.go b/topology.go index 2fc38a88..5a9b50fa 100644 --- a/topology.go +++ b/topology.go @@ -90,12 +90,13 @@ func getReplicationFactorFromOpts(val interface{}) (int, error) { } } -func getStrategy(ks *KeyspaceMetadata, logger StdLogger) placementStrategy { +func getStrategy(ks *KeyspaceMetadata, logger StructuredLogger) placementStrategy { switch { case strings.Contains(ks.StrategyClass, "SimpleStrategy"): rf, err := getReplicationFactorFromOpts(ks.StrategyOptions["replication_factor"]) if err != nil { - logger.Printf("parse rf for keyspace %q: %v", ks.Name, err) + logger.Warning("Failed to parse replication factor of keyspace configured with SimpleStrategy.", + newLogFieldString("keyspace", ks.Name), newLogFieldError("err", err)) return nil } return &simpleStrategy{rf: rf} @@ -108,7 +109,8 @@ func getStrategy(ks *KeyspaceMetadata, logger StdLogger) placementStrategy { rf, err := getReplicationFactorFromOpts(rf) if err != nil { - logger.Println("parse rf for keyspace %q, dc %q: %v", err) + logger.Warning("Failed to parse replication factors of keyspace configured with NetworkTopologyStrategy.", + newLogFieldString("keyspace", ks.Name), newLogFieldString("dc", dc), newLogFieldError("err", err)) // skip DC if the rf is invalid/unsupported, so that we can at least work with other working DCs. continue } @@ -119,7 +121,8 @@ func getStrategy(ks *KeyspaceMetadata, logger StdLogger) placementStrategy { case strings.Contains(ks.StrategyClass, "LocalStrategy"): return nil default: - logger.Printf("parse rf for keyspace %q: unsupported strategy class: %v", ks.StrategyClass) + logger.Warning("Failed to parse replication factor of keyspace due to unknown strategy class.", + newLogFieldString("keyspace", ks.Name), newLogFieldString("strategy_class", ks.StrategyClass)) return nil } } diff --git a/topology_test.go b/topology_test.go index fe8473e9..9a5cf1c1 100644 --- a/topology_test.go +++ b/topology_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org