This is an automated email from the ASF dual-hosted git repository.
joaoreis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-gocql-driver.git
The following commit(s) were added to refs/heads/trunk by this push:
new f3e2b397 Prefer connection address over system table addresses
f3e2b397 is described below
commit f3e2b397adaaf5f8742068dfbd88958c824934ee
Author: João Reis <[email protected]>
AuthorDate: Fri Oct 10 13:21:56 2025 +0100
Prefer connection address over system table addresses
There is an unintended change in 2.0-rc1 causing the driver to prefer
system table addresses over the connection address which breaks deployments
that rely on this behavior from 1.x.
This patch fixes this and keeps the behavior the same as it was in 1.x. It
also fixes an issue where connection address was not being used when a full
ring refresh was triggered.
Patch by João Reis; reviewed by James Hartig for CASSGO-91
---
CHANGELOG.md | 1 +
control.go | 2 +-
docker_test.go | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
host_source.go | 26 ++++++++++---------
session.go | 7 ++++-
5 files changed, 104 insertions(+), 14 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d8a3da0a..2fa50aa0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -76,6 +76,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
- Driver closes connection when timeout occurs (CASSGO-87)
- Do not set beta protocol flag when using v5 (CASSGO-88)
+- Driver is using system table ip addresses over the connection address
(CASSGO-91)
#### 2.0.0-rc1
diff --git a/control.go b/control.go
index 25957da1..de374c58 100644
--- a/control.go
+++ b/control.go
@@ -343,7 +343,7 @@ type connHost struct {
func (c *controlConn) setupConn(conn *Conn, sessionInit bool) error {
// we need up-to-date host info for the filterHost call below
iter := conn.querySystemLocal(context.TODO())
- host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress,
conn.r.RemoteAddr().(*net.TCPAddr).Port)
+ host, err := c.session.hostInfoFromIter(iter,
conn.host.ConnectAddress(), conn.r.RemoteAddr().(*net.TCPAddr).Port)
if err != nil {
// just cleanup
iter.Close()
diff --git a/docker_test.go b/docker_test.go
new file mode 100644
index 00000000..3d694751
--- /dev/null
+++ b/docker_test.go
@@ -0,0 +1,82 @@
+//go:build all || cassandra
+// +build all cassandra
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2016, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+import (
+ "fmt"
+ "os/exec"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+// This test tests that gocql is able to connect to a C* node provisioned with
Docker
+// This is useful to make sure we don't break common testing configurations
+func TestDocker(t *testing.T) {
+ version := "3.11.11"
+ randomUuid := MustRandomUUID().String()
+ err := exec.Command("docker", "run", "-d", "--name", randomUuid, "-p",
"9080:9042", fmt.Sprintf("cassandra:%s", version)).Run()
+ defer exec.Command("docker", "rm", "-f", randomUuid).Run()
+
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ cluster := NewCluster("localhost:9080")
+ cluster.Logger = NewLogger(LogLevelDebug)
+
+ timer := time.After(60 * time.Second)
+ var session *Session
+ done := false
+ for !done {
+ select {
+ case <-timer:
+ t.Fatalf("timed out, last err: %v", err)
+ default:
+ session, err = cluster.CreateSession()
+ if err == nil {
+ done = true
+ } else if strings.Contains(err.Error(), "unable to
discover protocol version") {
+ time.Sleep(5 * time.Second)
+ } else {
+ t.Fatal(err)
+ }
+ }
+ }
+
+ defer session.Close()
+ var parsedVersion string
+ err = session.Query("SELECT release_version FROM
system.local").Scan(&parsedVersion)
+ if err != nil {
+ t.Fatalf("failed to query: %s", err)
+ }
+
+ assert.Equal(t, version, parsedVersion)
+}
diff --git a/host_source.go b/host_source.go
index 3ee3dd07..ab653740 100644
--- a/host_source.go
+++ b/host_source.go
@@ -267,6 +267,13 @@ func (h *HostInfo) ConnectAddress() net.IP {
return addr
}
+// actualConnectAddress can be used to access the connectAddress field with
the lock (mu).
+func (h *HostInfo) actualConnectAddress() net.IP {
+ h.mu.RLock()
+ defer h.mu.RUnlock()
+ return h.connectAddress
+}
+
func (h *HostInfo) BroadcastAddress() net.IP {
h.mu.RLock()
defer h.mu.RUnlock()
@@ -680,16 +687,10 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP,
defaultPort int, row map
}
}
- // Determine the connect address from available addresses
- if validIpAddr(host.rpcAddress) {
- host.connectAddress = host.rpcAddress
- } else if validIpAddr(host.preferredIP) {
- host.connectAddress = host.preferredIP
- } else if validIpAddr(host.broadcastAddress) {
- host.connectAddress = host.broadcastAddress
- } else if validIpAddr(host.peer) {
- host.connectAddress = host.peer
- }
+ // this ensures that connectAddress gets a valid IP starting with
host.connectAddress and if it's not valid
+ // then falls back to an address read from the system table
+ // it is important that a system table address is not picked up UNLESS
connectAddress is nil or not valid
+ host.connectAddress, _ = host.connectAddressLocked()
if s != nil && s.cfg.AddressTranslator != nil {
ip, port := s.cfg.translateAddressPort(host.ConnectAddress(),
host.port, s.logger)
@@ -745,7 +746,8 @@ func (r *ringDescriber) getLocalHostInfo() (*HostInfo,
error) {
return nil, errNoControl
}
- host, err := r.session.hostInfoFromIter(iter, nil, r.session.cfg.Port)
+ // keep connect address for local host, ignore address from system.local
+ host, err := r.session.hostInfoFromIter(iter,
iter.host.actualConnectAddress(), r.session.cfg.Port)
if err != nil {
// just cleanup
iter.Close()
@@ -873,7 +875,7 @@ func refreshRing(r *ringDescriber) error {
if !ok {
return fmt.Errorf("get existing host=%s from
prevHosts: %w", h, ErrCannotFindHost)
}
- if h.connectAddress.Equal(existing.connectAddress) &&
h.nodeToNodeAddress().Equal(existing.nodeToNodeAddress()) {
+ if
h.actualConnectAddress().Equal(existing.actualConnectAddress()) &&
h.nodeToNodeAddress().Equal(existing.nodeToNodeAddress()) {
// no host IP change
host.update(h)
} else {
diff --git a/session.go b/session.go
index 722161f1..111c7d01 100644
--- a/session.go
+++ b/session.go
@@ -1333,7 +1333,12 @@ func (q *Query) iterInternal(c *Conn, ctx
context.Context) *Iter {
internalQry := newInternalQuery(q, ctx)
internalQry.conn = c
- return c.executeQuery(internalQry.Context(), internalQry)
+ iter := c.executeQuery(internalQry.Context(), internalQry)
+ if iter != nil {
+ // set iter.host so that the caller can retrieve the connect
address which should be preferable (if valid) for the local host
+ iter.host = c.host
+ }
+ return iter
}
// MapScan executes the query, copies the columns of the first selected
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]