This is an automated email from the ASF dual-hosted git repository. joaoreis pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-gocql-driver.git
The following commit(s) were added to refs/heads/trunk by this push: new 05e6036f Endless query execution fix 05e6036f is described below commit 05e6036ff1d7d3d8aaacbe6817723e435cad22db Author: tengu-alt <olexandr.luzh...@gmail.com> AuthorDate: Fri May 16 16:02:55 2025 +0300 Endless query execution fix Fix for the endless query execution when HostSelectionPolicy returns the same downed host. Internal HostSelectionPolicy will return the host only once if HostID is set. Documentation for external policies was added. patch by Oleksandr Luzhniy; reviewed by João Reis, for CASSGO-50 --- CHANGELOG.md | 2 ++ events_ccm_test.go | 6 +++--- policies.go | 1 + query_executor.go | 15 +++++++++------ ring.go | 6 +++--- 5 files changed, 18 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 554d8be9..99f1defb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Don't panic in MapExecuteBatchCAS if no `[applied]` column is returned (CASSGO-42) +- Endless query execution fix (CASSGO-50) + ## [1.7.0] - 2024-09-23 This release is the first after the donation of gocql to the Apache Software Foundation (ASF) diff --git a/events_ccm_test.go b/events_ccm_test.go index a105985b..6c605d79 100644 --- a/events_ccm_test.go +++ b/events_ccm_test.go @@ -104,7 +104,7 @@ func TestEventNodeDownControl(t *testing.T) { } session.pool.mu.RUnlock() - host := session.ring.getHost(node.Addr) + host, _ := session.ring.getHost(node.Addr) if host == nil { t.Fatal("node not in metadata ring") } else if host.IsUp() { @@ -146,7 +146,7 @@ func TestEventNodeDown(t *testing.T) { t.Fatal("node not removed after remove event") } - host := session.ring.getHost(node.Addr) + host, _ := session.ring.getHost(node.Addr) if host == nil { t.Fatal("node not in metadata ring") } else if host.IsUp() { @@ -203,7 +203,7 @@ func TestEventNodeUp(t *testing.T) { t.Fatal("node not added after node added event") } - host := session.ring.getHost(node.Addr) + host, _ := session.ring.getHost(node.Addr) if host == nil { t.Fatal("node not in metadata ring") } else if !host.IsUp() { diff --git a/policies.go b/policies.go index ed0b02f3..219f472d 100644 --- a/policies.go +++ b/policies.go @@ -323,6 +323,7 @@ func (host *selectedHost) Info() *HostInfo { func (host *selectedHost) Mark(err error) {} // NextHost is an iteration function over picked hosts +// Should return nil eventually to prevent endless query execution. type NextHost func() SelectedHost // RoundRobinHostPolicy is a round-robin load balancing policy, where each host diff --git a/query_executor.go b/query_executor.go index 9eaf19db..61a43c8c 100644 --- a/query_executor.go +++ b/query_executor.go @@ -27,6 +27,7 @@ package gocql import ( "context" "sync" + "sync/atomic" "time" ) @@ -89,14 +90,16 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) { // check if the host id is specified for the query, // if it is, the query should be executed at the corresponding host. if hostID := qry.GetHostID(); hostID != "" { + host, ok := q.pool.session.ring.getHost(hostID) + if !ok { + return nil, ErrNoConnections + } + var returnedHostOnce int32 = 0 hostIter = func() SelectedHost { - pool, ok := q.pool.getPoolByHostID(hostID) - // if the specified host is down - // we return nil to avoid endless query execution in queryExecutor.do() - if !ok || !pool.host.IsUp() { - return nil + if atomic.CompareAndSwapInt32(&returnedHostOnce, 0, 1) { + return (*selectedHost)(host) } - return (*selectedHost)(pool.host) + return nil } } diff --git a/ring.go b/ring.go index 6821c0df..1dbf07c4 100644 --- a/ring.go +++ b/ring.go @@ -67,11 +67,11 @@ func (r *ring) getHostByIP(ip string) (*HostInfo, bool) { return r.hosts[hi], ok } -func (r *ring) getHost(hostID string) *HostInfo { +func (r *ring) getHost(hostID string) (host *HostInfo, ok bool) { r.mu.RLock() - host := r.hosts[hostID] + host, ok = r.hosts[hostID] r.mu.RUnlock() - return host + return } func (r *ring) allHosts() []*HostInfo { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org