This is an automated email from the ASF dual-hosted git repository.

joaoreis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-gocql-driver.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 759351eb Don't collect host metrics if a query/batch observer is not 
provided
759351eb is described below

commit 759351eb6a57f821c23bd56c40b7ab03789f7c16
Author: João Reis <[email protected]>
AuthorDate: Wed Sep 24 19:14:46 2025 +0100

    Don't collect host metrics if a query/batch observer is not provided
    
    Patch by João Reis; reviewed by Bohdan Siryk and James Hartig for CASSGO-90
---
 CHANGELOG.md      |   4 ++
 cassandra_test.go |   2 +-
 conn.go           |   7 +++-
 conn_test.go      |  24 ++++++------
 control.go        |   7 ++--
 policies_test.go  |   6 ++-
 query_executor.go |  79 ++++++++++++++++++++++++---------------
 session.go        | 109 +++++++++++++++++++++---------------------------------
 8 files changed, 123 insertions(+), 115 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index fb779d7d..d02991cc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -19,6 +19,10 @@ and this project adheres to [Semantic 
Versioning](https://semver.org/spec/v2.0.0
 
 ### Added
 
+#### 2.0.0
+
+- Don't collect host metrics if a query/batch observer is not provided 
(CASSGO-90)
+
 #### 2.0.0-rc1
 
 - Support vector type (CASSGO-11)
diff --git a/cassandra_test.go b/cassandra_test.go
index b92c3cd6..cb660755 100644
--- a/cassandra_test.go
+++ b/cassandra_test.go
@@ -2068,7 +2068,7 @@ func TestQueryStats(t *testing.T) {
                        t.Fatal("expected at least 1 attempt, but got 0")
                }
                if iter.Latency() <= 0 {
-                       t.Fatalf("expected latency to be greater than 0, but 
got %v instead.", iter.Latency())
+                       t.Fatalf("expected latency to be > 0, but got %v 
instead.", iter.Latency())
                }
        }
 }
diff --git a/conn.go b/conn.go
index b7f3723d..bf5a493d 100644
--- a/conn.go
+++ b/conn.go
@@ -1670,7 +1670,12 @@ func (c *Conn) executeQuery(ctx context.Context, q 
*internalQuery) *Iter {
                        newQry := new(internalQuery)
                        *newQry = *q
                        newQry.pageState = copyBytes(x.meta.pagingState)
-                       newQry.metrics = &queryMetrics{m: 
make(map[string]*hostMetrics)}
+                       newQry.metrics = &queryMetrics{}
+                       if newQry.qryOpts.observer != nil {
+                               newQry.hostMetricsManager = 
newHostMetricsManager()
+                       } else {
+                               newQry.hostMetricsManager = 
emptyHostMetricsManager
+                       }
 
                        iter.next = &nextIter{
                                q:   newQry,
diff --git a/conn_test.go b/conn_test.go
index d3271acc..3646dcc8 100644
--- a/conn_test.go
+++ b/conn_test.go
@@ -450,28 +450,28 @@ func TestQueryMultinodeWithMetrics(t *testing.T) {
        if err == nil {
                t.Fatalf("expected error")
        }
+       totalLatency := int64(0)
+       totalAttempts := int64(0)
 
        for i, ip := range addresses {
                host := &HostInfo{connectAddress: net.ParseIP(ip)}
-               queryMetric := iter.metrics.hostMetrics(host)
                observedMetrics := observer.GetMetrics(host)
-
                requests := int(atomic.LoadInt64(&nodes[i].nKillReq))
-               hostAttempts := queryMetric.Attempts
-               if requests != hostAttempts {
-                       t.Fatalf("expected requests %v to match query attempts 
%v", requests, hostAttempts)
-               }
 
-               if hostAttempts != observedMetrics.Attempts {
-                       t.Fatalf("expected observed attempts %v to match query 
attempts %v on host %v", observedMetrics.Attempts, hostAttempts, ip)
+               if requests != observedMetrics.Attempts {
+                       t.Fatalf("expected observed attempts %v to match server 
requests %v on host %v", observedMetrics.Attempts, requests, ip)
                }
 
-               hostLatency := queryMetric.TotalLatency
                observedLatency := observedMetrics.TotalLatency
-               if hostLatency != observedLatency {
-                       t.Fatalf("expected observed latency %v to match query 
latency %v on host %v", observedLatency, hostLatency, ip)
-               }
+               totalLatency += observedLatency
+               totalAttempts += int64(observedMetrics.Attempts)
        }
+
+       observedLatency := totalLatency / totalAttempts
+       if observedLatency != iter.Latency() {
+               t.Fatalf("expected observed latency %v (%v/%v) to match query 
latency %v", observedLatency, totalLatency, totalAttempts, iter.Latency())
+       }
+
        // the query will only be attempted once, but is being retried
        attempts := iter.Attempts()
        if attempts != rt.NumRetries {
diff --git a/control.go b/control.go
index e3c835b4..25957da1 100644
--- a/control.go
+++ b/control.go
@@ -580,7 +580,7 @@ func (c *controlConn) withConnHost(fn func(*connHost) 
*Iter) *Iter {
                return fn(ch)
        }
 
-       return newErrIter(errNoControl, newQueryMetrics(), "", nil, nil)
+       return newErrIter(errNoControl, &queryMetrics{}, "", nil, nil)
 }
 
 func (c *controlConn) withConn(fn func(*Conn) *Iter) *Iter {
@@ -605,7 +605,8 @@ func (c *controlConn) query(statement string, values 
...interface{}) (iter *Iter
                                newLogFieldString("statement", statement), 
newLogFieldError("err", iter.err))
                }
 
-               iter.metrics.attempt(1, 0, c.getConn().host, false)
+               qry.metrics.attempt(0)
+               qry.hostMetricsManager.attempt(0, c.getConn().host)
                if iter.err == nil || !c.retry.Attempt(qry) {
                        break
                }
@@ -616,7 +617,7 @@ func (c *controlConn) query(statement string, values 
...interface{}) (iter *Iter
 
 func (c *controlConn) awaitSchemaAgreement() error {
        return c.withConn(func(conn *Conn) *Iter {
-               return newErrIter(conn.awaitSchemaAgreement(context.TODO()), 
newQueryMetrics(), "", nil, nil)
+               return newErrIter(conn.awaitSchemaAgreement(context.TODO()), 
&queryMetrics{}, "", nil, nil)
        }).err
 }
 
diff --git a/policies_test.go b/policies_test.go
index ab40a62a..62b3ad12 100644
--- a/policies_test.go
+++ b/policies_test.go
@@ -261,7 +261,8 @@ func TestSimpleRetryPolicy(t *testing.T) {
        }
 
        for _, c := range cases {
-               q.metrics = 
preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: 
c.attempts}})
+               q.metrics = &queryMetrics{totalAttempts: int64(c.attempts)}
+               q.hostMetricsManager = 
preFilledHostMetricsMetricsManager(map[string]*hostMetrics{"127.0.0.1": 
{Attempts: c.attempts}})
                if c.allow && !rt.Attempt(q) {
                        t.Fatalf("should allow retry after %d attempts", 
c.attempts)
                }
@@ -345,7 +346,8 @@ func TestDowngradingConsistencyRetryPolicy(t *testing.T) {
        }
 
        for _, c := range cases {
-               q.metrics = 
preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: 
c.attempts}})
+               q.metrics = &queryMetrics{totalAttempts: int64(c.attempts)}
+               q.hostMetricsManager = 
preFilledHostMetricsMetricsManager(map[string]*hostMetrics{"127.0.0.1": 
{Attempts: c.attempts}})
                if c.retryType != rt.GetRetryType(c.err) {
                        t.Fatalf("retry type should be %v", c.retryType)
                }
diff --git a/query_executor.go b/query_executor.go
index 552d0b97..35422ffe 100644
--- a/query_executor.go
+++ b/query_executor.go
@@ -334,14 +334,15 @@ func newQueryOptions(q *Query, ctx context.Context) 
*queryOptions {
 }
 
 type internalQuery struct {
-       originalQuery *Query
-       qryOpts       *queryOptions
-       pageState     []byte
-       metrics       *queryMetrics
-       conn          *Conn
-       consistency   uint32
-       session       *Session
-       routingInfo   *queryRoutingInfo
+       originalQuery      *Query
+       qryOpts            *queryOptions
+       pageState          []byte
+       conn               *Conn
+       consistency        uint32
+       session            *Session
+       routingInfo        *queryRoutingInfo
+       metrics            *queryMetrics
+       hostMetricsManager hostMetricsManager
 }
 
 func newInternalQuery(q *Query, ctx context.Context) *internalQuery {
@@ -351,15 +352,22 @@ func newInternalQuery(q *Query, ctx context.Context) 
*internalQuery {
                newPageState = make([]byte, len(pageState))
                copy(newPageState, pageState)
        }
+       var hostMetricsMgr hostMetricsManager
+       if q.observer != nil {
+               hostMetricsMgr = newHostMetricsManager()
+       } else {
+               hostMetricsMgr = emptyHostMetricsManager
+       }
        return &internalQuery{
-               originalQuery: q,
-               qryOpts:       newQueryOptions(q, ctx),
-               metrics:       &queryMetrics{m: make(map[string]*hostMetrics)},
-               consistency:   uint32(q.initialConsistency),
-               pageState:     newPageState,
-               conn:          nil,
-               session:       q.session,
-               routingInfo:   &queryRoutingInfo{},
+               originalQuery:      q,
+               qryOpts:            newQueryOptions(q, ctx),
+               metrics:            &queryMetrics{},
+               hostMetricsManager: hostMetricsMgr,
+               consistency:        uint32(q.initialConsistency),
+               pageState:          newPageState,
+               conn:               nil,
+               session:            q.session,
+               routingInfo:        &queryRoutingInfo{},
        }
 }
 
@@ -370,9 +378,10 @@ func (q *internalQuery) Attempts() int {
 
 func (q *internalQuery) attempt(keyspace string, end, start time.Time, iter 
*Iter, host *HostInfo) {
        latency := end.Sub(start)
-       attempt, metricsForHost := q.metrics.attempt(1, latency, host, 
q.qryOpts.observer != nil)
+       attempt := q.metrics.attempt(latency)
 
        if q.qryOpts.observer != nil {
+               metricsForHost := q.hostMetricsManager.attempt(latency, host)
                q.qryOpts.observer.ObserveQuery(q.qryOpts.context, 
ObservedQuery{
                        Keyspace:  keyspace,
                        Statement: q.qryOpts.stmt,
@@ -546,22 +555,30 @@ func newBatchOptions(b *Batch, ctx context.Context) 
*batchOptions {
 }
 
 type internalBatch struct {
-       originalBatch *Batch
-       batchOpts     *batchOptions
-       metrics       *queryMetrics
-       consistency   uint32
-       routingInfo   *queryRoutingInfo
-       session       *Session
+       originalBatch      *Batch
+       batchOpts          *batchOptions
+       consistency        uint32
+       routingInfo        *queryRoutingInfo
+       session            *Session
+       metrics            *queryMetrics
+       hostMetricsManager hostMetricsManager
 }
 
 func newInternalBatch(batch *Batch, ctx context.Context) *internalBatch {
+       var hostMetricsMgr hostMetricsManager
+       if batch.observer != nil {
+               hostMetricsMgr = newHostMetricsManager()
+       } else {
+               hostMetricsMgr = emptyHostMetricsManager
+       }
        return &internalBatch{
-               originalBatch: batch,
-               batchOpts:     newBatchOptions(batch, ctx),
-               metrics:       &queryMetrics{m: make(map[string]*hostMetrics)},
-               routingInfo:   &queryRoutingInfo{},
-               session:       batch.session,
-               consistency:   uint32(batch.GetConsistency()),
+               originalBatch:      batch,
+               batchOpts:          newBatchOptions(batch, ctx),
+               routingInfo:        &queryRoutingInfo{},
+               session:            batch.session,
+               consistency:        uint32(batch.GetConsistency()),
+               metrics:            &queryMetrics{},
+               hostMetricsManager: hostMetricsMgr,
        }
 }
 
@@ -572,12 +589,14 @@ func (b *internalBatch) Attempts() int {
 
 func (b *internalBatch) attempt(keyspace string, end, start time.Time, iter 
*Iter, host *HostInfo) {
        latency := end.Sub(start)
-       attempt, metricsForHost := b.metrics.attempt(1, latency, host, 
b.batchOpts.observer != nil)
+       attempt := b.metrics.attempt(latency)
 
        if b.batchOpts.observer == nil {
                return
        }
 
+       metricsForHost := b.hostMetricsManager.attempt(latency, host)
+
        statements := make([]string, len(b.batchOpts.entries))
        values := make([][]interface{}, len(b.batchOpts.entries))
 
diff --git a/session.go b/session.go
index 4e83e5e5..625dac4a 100644
--- a/session.go
+++ b/session.go
@@ -389,7 +389,7 @@ func (s *Session) AwaitSchemaAgreement(ctx context.Context) 
error {
                return errNoControl
        }
        return s.control.withConn(func(conn *Conn) *Iter {
-               return newErrIter(conn.awaitSchemaAgreement(ctx), 
newQueryMetrics(), "", nil, nil)
+               return newErrIter(conn.awaitSchemaAgreement(ctx), 
&queryMetrics{}, "", nil, nil)
        }).err
 }
 
@@ -861,40 +861,48 @@ type hostMetrics struct {
 }
 
 type queryMetrics struct {
-       l sync.RWMutex
-       m map[string]*hostMetrics
-       // totalAttempts is total number of attempts.
-       // Equal to sum of all hostMetrics' Attempts.
-       totalAttempts int
+       totalAttempts int64
+       totalLatency  int64
 }
 
-func newQueryMetrics() *queryMetrics {
-       return &queryMetrics{m: make(map[string]*hostMetrics)}
+func (qm *queryMetrics) attempt(addLatency time.Duration) int {
+       atomic.AddInt64(&qm.totalLatency, addLatency.Nanoseconds())
+       return int(atomic.AddInt64(&qm.totalAttempts, 1) - 1)
+}
+
+func (qm *queryMetrics) attempts() int {
+       return int(atomic.LoadInt64(&qm.totalAttempts))
 }
 
-// preFilledQueryMetrics initializes new queryMetrics based on per-host 
supplied data.
-func preFilledQueryMetrics(m map[string]*hostMetrics) *queryMetrics {
-       qm := &queryMetrics{m: m}
-       for _, hm := range qm.m {
-               qm.totalAttempts += hm.Attempts
+func (qm *queryMetrics) latency() int64 {
+       attempts := atomic.LoadInt64(&qm.totalAttempts)
+       if attempts == 0 {
+               return atomic.LoadInt64(&qm.totalLatency)
        }
-       return qm
+       return atomic.LoadInt64(&qm.totalLatency) / attempts
 }
 
-// hostMetrics returns a snapshot of metrics for given host.
-// If the metrics for host don't exist, they are created.
-func (qm *queryMetrics) hostMetrics(host *HostInfo) *hostMetrics {
-       qm.l.Lock()
-       metrics := qm.hostMetricsLocked(host)
-       copied := new(hostMetrics)
-       *copied = *metrics
-       qm.l.Unlock()
-       return copied
+type hostMetricsManager interface {
+       attempt(addLatency time.Duration, host *HostInfo) *hostMetrics
+}
+
+type hostMetricsManagerImpl struct {
+       l sync.RWMutex
+       m map[string]*hostMetrics
+}
+
+func newHostMetricsManager() *hostMetricsManagerImpl {
+       return &hostMetricsManagerImpl{m: make(map[string]*hostMetrics)}
+}
+
+// preFilledHostMetricsMetricsManager initializes new hostMetrics based on 
per-host supplied data.
+func preFilledHostMetricsMetricsManager(m map[string]*hostMetrics) 
*hostMetricsManagerImpl {
+       return &hostMetricsManagerImpl{m: m}
 }
 
 // hostMetricsLocked gets or creates host metrics for given host.
 // It must be called only while holding qm.l lock.
-func (qm *queryMetrics) hostMetricsLocked(host *HostInfo) *hostMetrics {
+func (qm *hostMetricsManagerImpl) hostMetricsLocked(host *HostInfo) 
*hostMetrics {
        metrics, exists := qm.m[host.ConnectAddress().String()]
        if !exists {
                // if the host is not in the map, it means it's been accessed 
for the first time
@@ -905,53 +913,22 @@ func (qm *queryMetrics) hostMetricsLocked(host *HostInfo) 
*hostMetrics {
        return metrics
 }
 
-// attempts returns the number of times the query was executed.
-func (qm *queryMetrics) attempts() int {
+func (qm *hostMetricsManagerImpl) attempt(addLatency time.Duration, host 
*HostInfo) *hostMetrics {
        qm.l.Lock()
-       attempts := qm.totalAttempts
-       qm.l.Unlock()
-       return attempts
-}
-
-func (qm *queryMetrics) latency() int64 {
-       qm.l.Lock()
-       var (
-               attempts int
-               latency  int64
-       )
-       for _, metric := range qm.m {
-               attempts += metric.Attempts
-               latency += metric.TotalLatency
-       }
+       updateHostMetrics := qm.hostMetricsLocked(host)
+       updateHostMetrics.Attempts += 1
+       updateHostMetrics.TotalLatency += addLatency.Nanoseconds()
        qm.l.Unlock()
-       if attempts > 0 {
-               return latency / int64(attempts)
-       }
-       return 0
+       return updateHostMetrics
 }
 
-// attempt adds given number of attempts and latency for given host.
-// It returns previous total attempts.
-// If needsHostMetrics is true, a copy of updated hostMetrics is returned.
-func (qm *queryMetrics) attempt(addAttempts int, addLatency time.Duration,
-       host *HostInfo, needsHostMetrics bool) (int, *hostMetrics) {
-       qm.l.Lock()
-
-       totalAttempts := qm.totalAttempts
-       qm.totalAttempts += addAttempts
-
-       updateHostMetrics := qm.hostMetricsLocked(host)
-       updateHostMetrics.Attempts += addAttempts
-       updateHostMetrics.TotalLatency += addLatency.Nanoseconds()
+var emptyHostMetricsManager = &emptyHostMetricsManagerImpl{}
 
-       var hostMetricsCopy *hostMetrics
-       if needsHostMetrics {
-               hostMetricsCopy = new(hostMetrics)
-               *hostMetricsCopy = *updateHostMetrics
-       }
+type emptyHostMetricsManagerImpl struct {
+}
 
-       qm.l.Unlock()
-       return totalAttempts, hostMetricsCopy
+func (qm *emptyHostMetricsManagerImpl) attempt(_ time.Duration, _ *HostInfo) 
*hostMetrics {
+       return nil
 }
 
 // Query represents a CQL statement that can be executed.
@@ -1297,7 +1274,7 @@ func (q *Query) Iter() *Iter {
 // over all results.
 func (q *Query) IterContext(ctx context.Context) *Iter {
        if isUseStatement(q.stmt) {
-               return newErrIter(ErrUseStmt, newQueryMetrics(), q.Keyspace(), 
nil, q.getKeyspace)
+               return newErrIter(ErrUseStmt, &queryMetrics{}, q.Keyspace(), 
nil, q.getKeyspace)
        }
 
        internalQry := newInternalQuery(q, ctx)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to