This is an automated email from the ASF dual-hosted git repository.
joaoreis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-gocql-driver.git
The following commit(s) were added to refs/heads/trunk by this push:
new 759351eb Don't collect host metrics if a query/batch observer is not
provided
759351eb is described below
commit 759351eb6a57f821c23bd56c40b7ab03789f7c16
Author: João Reis <[email protected]>
AuthorDate: Wed Sep 24 19:14:46 2025 +0100
Don't collect host metrics if a query/batch observer is not provided
Patch by João Reis; reviewed by Bohdan Siryk and James Hartig for CASSGO-90
---
CHANGELOG.md | 4 ++
cassandra_test.go | 2 +-
conn.go | 7 +++-
conn_test.go | 24 ++++++------
control.go | 7 ++--
policies_test.go | 6 ++-
query_executor.go | 79 ++++++++++++++++++++++++---------------
session.go | 109 +++++++++++++++++++++---------------------------------
8 files changed, 123 insertions(+), 115 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fb779d7d..d02991cc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -19,6 +19,10 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
### Added
+#### 2.0.0
+
+- Don't collect host metrics if a query/batch observer is not provided
(CASSGO-90)
+
#### 2.0.0-rc1
- Support vector type (CASSGO-11)
diff --git a/cassandra_test.go b/cassandra_test.go
index b92c3cd6..cb660755 100644
--- a/cassandra_test.go
+++ b/cassandra_test.go
@@ -2068,7 +2068,7 @@ func TestQueryStats(t *testing.T) {
t.Fatal("expected at least 1 attempt, but got 0")
}
if iter.Latency() <= 0 {
- t.Fatalf("expected latency to be greater than 0, but
got %v instead.", iter.Latency())
+ t.Fatalf("expected latency to be > 0, but got %v
instead.", iter.Latency())
}
}
}
diff --git a/conn.go b/conn.go
index b7f3723d..bf5a493d 100644
--- a/conn.go
+++ b/conn.go
@@ -1670,7 +1670,12 @@ func (c *Conn) executeQuery(ctx context.Context, q
*internalQuery) *Iter {
newQry := new(internalQuery)
*newQry = *q
newQry.pageState = copyBytes(x.meta.pagingState)
- newQry.metrics = &queryMetrics{m:
make(map[string]*hostMetrics)}
+ newQry.metrics = &queryMetrics{}
+ if newQry.qryOpts.observer != nil {
+ newQry.hostMetricsManager =
newHostMetricsManager()
+ } else {
+ newQry.hostMetricsManager =
emptyHostMetricsManager
+ }
iter.next = &nextIter{
q: newQry,
diff --git a/conn_test.go b/conn_test.go
index d3271acc..3646dcc8 100644
--- a/conn_test.go
+++ b/conn_test.go
@@ -450,28 +450,28 @@ func TestQueryMultinodeWithMetrics(t *testing.T) {
if err == nil {
t.Fatalf("expected error")
}
+ totalLatency := int64(0)
+ totalAttempts := int64(0)
for i, ip := range addresses {
host := &HostInfo{connectAddress: net.ParseIP(ip)}
- queryMetric := iter.metrics.hostMetrics(host)
observedMetrics := observer.GetMetrics(host)
-
requests := int(atomic.LoadInt64(&nodes[i].nKillReq))
- hostAttempts := queryMetric.Attempts
- if requests != hostAttempts {
- t.Fatalf("expected requests %v to match query attempts
%v", requests, hostAttempts)
- }
- if hostAttempts != observedMetrics.Attempts {
- t.Fatalf("expected observed attempts %v to match query
attempts %v on host %v", observedMetrics.Attempts, hostAttempts, ip)
+ if requests != observedMetrics.Attempts {
+ t.Fatalf("expected observed attempts %v to match server
requests %v on host %v", observedMetrics.Attempts, requests, ip)
}
- hostLatency := queryMetric.TotalLatency
observedLatency := observedMetrics.TotalLatency
- if hostLatency != observedLatency {
- t.Fatalf("expected observed latency %v to match query
latency %v on host %v", observedLatency, hostLatency, ip)
- }
+ totalLatency += observedLatency
+ totalAttempts += int64(observedMetrics.Attempts)
}
+
+ observedLatency := totalLatency / totalAttempts
+ if observedLatency != iter.Latency() {
+ t.Fatalf("expected observed latency %v (%v/%v) to match query
latency %v", observedLatency, totalLatency, totalAttempts, iter.Latency())
+ }
+
// the query will only be attempted once, but is being retried
attempts := iter.Attempts()
if attempts != rt.NumRetries {
diff --git a/control.go b/control.go
index e3c835b4..25957da1 100644
--- a/control.go
+++ b/control.go
@@ -580,7 +580,7 @@ func (c *controlConn) withConnHost(fn func(*connHost)
*Iter) *Iter {
return fn(ch)
}
- return newErrIter(errNoControl, newQueryMetrics(), "", nil, nil)
+ return newErrIter(errNoControl, &queryMetrics{}, "", nil, nil)
}
func (c *controlConn) withConn(fn func(*Conn) *Iter) *Iter {
@@ -605,7 +605,8 @@ func (c *controlConn) query(statement string, values
...interface{}) (iter *Iter
newLogFieldString("statement", statement),
newLogFieldError("err", iter.err))
}
- iter.metrics.attempt(1, 0, c.getConn().host, false)
+ qry.metrics.attempt(0)
+ qry.hostMetricsManager.attempt(0, c.getConn().host)
if iter.err == nil || !c.retry.Attempt(qry) {
break
}
@@ -616,7 +617,7 @@ func (c *controlConn) query(statement string, values
...interface{}) (iter *Iter
func (c *controlConn) awaitSchemaAgreement() error {
return c.withConn(func(conn *Conn) *Iter {
- return newErrIter(conn.awaitSchemaAgreement(context.TODO()),
newQueryMetrics(), "", nil, nil)
+ return newErrIter(conn.awaitSchemaAgreement(context.TODO()),
&queryMetrics{}, "", nil, nil)
}).err
}
diff --git a/policies_test.go b/policies_test.go
index ab40a62a..62b3ad12 100644
--- a/policies_test.go
+++ b/policies_test.go
@@ -261,7 +261,8 @@ func TestSimpleRetryPolicy(t *testing.T) {
}
for _, c := range cases {
- q.metrics =
preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts:
c.attempts}})
+ q.metrics = &queryMetrics{totalAttempts: int64(c.attempts)}
+ q.hostMetricsManager =
preFilledHostMetricsMetricsManager(map[string]*hostMetrics{"127.0.0.1":
{Attempts: c.attempts}})
if c.allow && !rt.Attempt(q) {
t.Fatalf("should allow retry after %d attempts",
c.attempts)
}
@@ -345,7 +346,8 @@ func TestDowngradingConsistencyRetryPolicy(t *testing.T) {
}
for _, c := range cases {
- q.metrics =
preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts:
c.attempts}})
+ q.metrics = &queryMetrics{totalAttempts: int64(c.attempts)}
+ q.hostMetricsManager =
preFilledHostMetricsMetricsManager(map[string]*hostMetrics{"127.0.0.1":
{Attempts: c.attempts}})
if c.retryType != rt.GetRetryType(c.err) {
t.Fatalf("retry type should be %v", c.retryType)
}
diff --git a/query_executor.go b/query_executor.go
index 552d0b97..35422ffe 100644
--- a/query_executor.go
+++ b/query_executor.go
@@ -334,14 +334,15 @@ func newQueryOptions(q *Query, ctx context.Context)
*queryOptions {
}
type internalQuery struct {
- originalQuery *Query
- qryOpts *queryOptions
- pageState []byte
- metrics *queryMetrics
- conn *Conn
- consistency uint32
- session *Session
- routingInfo *queryRoutingInfo
+ originalQuery *Query
+ qryOpts *queryOptions
+ pageState []byte
+ conn *Conn
+ consistency uint32
+ session *Session
+ routingInfo *queryRoutingInfo
+ metrics *queryMetrics
+ hostMetricsManager hostMetricsManager
}
func newInternalQuery(q *Query, ctx context.Context) *internalQuery {
@@ -351,15 +352,22 @@ func newInternalQuery(q *Query, ctx context.Context)
*internalQuery {
newPageState = make([]byte, len(pageState))
copy(newPageState, pageState)
}
+ var hostMetricsMgr hostMetricsManager
+ if q.observer != nil {
+ hostMetricsMgr = newHostMetricsManager()
+ } else {
+ hostMetricsMgr = emptyHostMetricsManager
+ }
return &internalQuery{
- originalQuery: q,
- qryOpts: newQueryOptions(q, ctx),
- metrics: &queryMetrics{m: make(map[string]*hostMetrics)},
- consistency: uint32(q.initialConsistency),
- pageState: newPageState,
- conn: nil,
- session: q.session,
- routingInfo: &queryRoutingInfo{},
+ originalQuery: q,
+ qryOpts: newQueryOptions(q, ctx),
+ metrics: &queryMetrics{},
+ hostMetricsManager: hostMetricsMgr,
+ consistency: uint32(q.initialConsistency),
+ pageState: newPageState,
+ conn: nil,
+ session: q.session,
+ routingInfo: &queryRoutingInfo{},
}
}
@@ -370,9 +378,10 @@ func (q *internalQuery) Attempts() int {
func (q *internalQuery) attempt(keyspace string, end, start time.Time, iter
*Iter, host *HostInfo) {
latency := end.Sub(start)
- attempt, metricsForHost := q.metrics.attempt(1, latency, host,
q.qryOpts.observer != nil)
+ attempt := q.metrics.attempt(latency)
if q.qryOpts.observer != nil {
+ metricsForHost := q.hostMetricsManager.attempt(latency, host)
q.qryOpts.observer.ObserveQuery(q.qryOpts.context,
ObservedQuery{
Keyspace: keyspace,
Statement: q.qryOpts.stmt,
@@ -546,22 +555,30 @@ func newBatchOptions(b *Batch, ctx context.Context)
*batchOptions {
}
type internalBatch struct {
- originalBatch *Batch
- batchOpts *batchOptions
- metrics *queryMetrics
- consistency uint32
- routingInfo *queryRoutingInfo
- session *Session
+ originalBatch *Batch
+ batchOpts *batchOptions
+ consistency uint32
+ routingInfo *queryRoutingInfo
+ session *Session
+ metrics *queryMetrics
+ hostMetricsManager hostMetricsManager
}
func newInternalBatch(batch *Batch, ctx context.Context) *internalBatch {
+ var hostMetricsMgr hostMetricsManager
+ if batch.observer != nil {
+ hostMetricsMgr = newHostMetricsManager()
+ } else {
+ hostMetricsMgr = emptyHostMetricsManager
+ }
return &internalBatch{
- originalBatch: batch,
- batchOpts: newBatchOptions(batch, ctx),
- metrics: &queryMetrics{m: make(map[string]*hostMetrics)},
- routingInfo: &queryRoutingInfo{},
- session: batch.session,
- consistency: uint32(batch.GetConsistency()),
+ originalBatch: batch,
+ batchOpts: newBatchOptions(batch, ctx),
+ routingInfo: &queryRoutingInfo{},
+ session: batch.session,
+ consistency: uint32(batch.GetConsistency()),
+ metrics: &queryMetrics{},
+ hostMetricsManager: hostMetricsMgr,
}
}
@@ -572,12 +589,14 @@ func (b *internalBatch) Attempts() int {
func (b *internalBatch) attempt(keyspace string, end, start time.Time, iter
*Iter, host *HostInfo) {
latency := end.Sub(start)
- attempt, metricsForHost := b.metrics.attempt(1, latency, host,
b.batchOpts.observer != nil)
+ attempt := b.metrics.attempt(latency)
if b.batchOpts.observer == nil {
return
}
+ metricsForHost := b.hostMetricsManager.attempt(latency, host)
+
statements := make([]string, len(b.batchOpts.entries))
values := make([][]interface{}, len(b.batchOpts.entries))
diff --git a/session.go b/session.go
index 4e83e5e5..625dac4a 100644
--- a/session.go
+++ b/session.go
@@ -389,7 +389,7 @@ func (s *Session) AwaitSchemaAgreement(ctx context.Context)
error {
return errNoControl
}
return s.control.withConn(func(conn *Conn) *Iter {
- return newErrIter(conn.awaitSchemaAgreement(ctx),
newQueryMetrics(), "", nil, nil)
+ return newErrIter(conn.awaitSchemaAgreement(ctx),
&queryMetrics{}, "", nil, nil)
}).err
}
@@ -861,40 +861,48 @@ type hostMetrics struct {
}
type queryMetrics struct {
- l sync.RWMutex
- m map[string]*hostMetrics
- // totalAttempts is total number of attempts.
- // Equal to sum of all hostMetrics' Attempts.
- totalAttempts int
+ totalAttempts int64
+ totalLatency int64
}
-func newQueryMetrics() *queryMetrics {
- return &queryMetrics{m: make(map[string]*hostMetrics)}
+func (qm *queryMetrics) attempt(addLatency time.Duration) int {
+ atomic.AddInt64(&qm.totalLatency, addLatency.Nanoseconds())
+ return int(atomic.AddInt64(&qm.totalAttempts, 1) - 1)
+}
+
+func (qm *queryMetrics) attempts() int {
+ return int(atomic.LoadInt64(&qm.totalAttempts))
}
-// preFilledQueryMetrics initializes new queryMetrics based on per-host
supplied data.
-func preFilledQueryMetrics(m map[string]*hostMetrics) *queryMetrics {
- qm := &queryMetrics{m: m}
- for _, hm := range qm.m {
- qm.totalAttempts += hm.Attempts
+func (qm *queryMetrics) latency() int64 {
+ attempts := atomic.LoadInt64(&qm.totalAttempts)
+ if attempts == 0 {
+ return atomic.LoadInt64(&qm.totalLatency)
}
- return qm
+ return atomic.LoadInt64(&qm.totalLatency) / attempts
}
-// hostMetrics returns a snapshot of metrics for given host.
-// If the metrics for host don't exist, they are created.
-func (qm *queryMetrics) hostMetrics(host *HostInfo) *hostMetrics {
- qm.l.Lock()
- metrics := qm.hostMetricsLocked(host)
- copied := new(hostMetrics)
- *copied = *metrics
- qm.l.Unlock()
- return copied
+type hostMetricsManager interface {
+ attempt(addLatency time.Duration, host *HostInfo) *hostMetrics
+}
+
+type hostMetricsManagerImpl struct {
+ l sync.RWMutex
+ m map[string]*hostMetrics
+}
+
+func newHostMetricsManager() *hostMetricsManagerImpl {
+ return &hostMetricsManagerImpl{m: make(map[string]*hostMetrics)}
+}
+
+// preFilledHostMetricsMetricsManager initializes new hostMetrics based on
per-host supplied data.
+func preFilledHostMetricsMetricsManager(m map[string]*hostMetrics)
*hostMetricsManagerImpl {
+ return &hostMetricsManagerImpl{m: m}
}
// hostMetricsLocked gets or creates host metrics for given host.
// It must be called only while holding qm.l lock.
-func (qm *queryMetrics) hostMetricsLocked(host *HostInfo) *hostMetrics {
+func (qm *hostMetricsManagerImpl) hostMetricsLocked(host *HostInfo)
*hostMetrics {
metrics, exists := qm.m[host.ConnectAddress().String()]
if !exists {
// if the host is not in the map, it means it's been accessed
for the first time
@@ -905,53 +913,22 @@ func (qm *queryMetrics) hostMetricsLocked(host *HostInfo)
*hostMetrics {
return metrics
}
-// attempts returns the number of times the query was executed.
-func (qm *queryMetrics) attempts() int {
+func (qm *hostMetricsManagerImpl) attempt(addLatency time.Duration, host
*HostInfo) *hostMetrics {
qm.l.Lock()
- attempts := qm.totalAttempts
- qm.l.Unlock()
- return attempts
-}
-
-func (qm *queryMetrics) latency() int64 {
- qm.l.Lock()
- var (
- attempts int
- latency int64
- )
- for _, metric := range qm.m {
- attempts += metric.Attempts
- latency += metric.TotalLatency
- }
+ updateHostMetrics := qm.hostMetricsLocked(host)
+ updateHostMetrics.Attempts += 1
+ updateHostMetrics.TotalLatency += addLatency.Nanoseconds()
qm.l.Unlock()
- if attempts > 0 {
- return latency / int64(attempts)
- }
- return 0
+ return updateHostMetrics
}
-// attempt adds given number of attempts and latency for given host.
-// It returns previous total attempts.
-// If needsHostMetrics is true, a copy of updated hostMetrics is returned.
-func (qm *queryMetrics) attempt(addAttempts int, addLatency time.Duration,
- host *HostInfo, needsHostMetrics bool) (int, *hostMetrics) {
- qm.l.Lock()
-
- totalAttempts := qm.totalAttempts
- qm.totalAttempts += addAttempts
-
- updateHostMetrics := qm.hostMetricsLocked(host)
- updateHostMetrics.Attempts += addAttempts
- updateHostMetrics.TotalLatency += addLatency.Nanoseconds()
+var emptyHostMetricsManager = &emptyHostMetricsManagerImpl{}
- var hostMetricsCopy *hostMetrics
- if needsHostMetrics {
- hostMetricsCopy = new(hostMetrics)
- *hostMetricsCopy = *updateHostMetrics
- }
+type emptyHostMetricsManagerImpl struct {
+}
- qm.l.Unlock()
- return totalAttempts, hostMetricsCopy
+func (qm *emptyHostMetricsManagerImpl) attempt(_ time.Duration, _ *HostInfo)
*hostMetrics {
+ return nil
}
// Query represents a CQL statement that can be executed.
@@ -1297,7 +1274,7 @@ func (q *Query) Iter() *Iter {
// over all results.
func (q *Query) IterContext(ctx context.Context) *Iter {
if isUseStatement(q.stmt) {
- return newErrIter(ErrUseStmt, newQueryMetrics(), q.Keyspace(),
nil, q.getKeyspace)
+ return newErrIter(ErrUseStmt, &queryMetrics{}, q.Keyspace(),
nil, q.getKeyspace)
}
internalQry := newInternalQuery(q, ctx)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]