isopov opened a new issue, #1884: URL: https://github.com/apache/cassandra-gocql-driver/issues/1884
Sorry for the lengthy reproduction (maybe it is possible to make it smaller and have more control) What I do: - Install bitnami/cassandra to my local k8s (I use kind) `helm install --namespace cassandra --create-namespace cassandra bitnami/cassandra --set "replicaCount=6"` - Expose it to connect from localhost `kubectl expose service cassandra --type=NodePort --name=cassandra-external --namespace=cassandra` - Get host for connection with `docker inspect kind-control-plane | grep IPAddress` - Get port for connection with `kubectl get services --namespace=cassandra | grep external` - Get password with `kubectl get secret --namespace "cassandra" cassandra -o jsonpath="{.data.cassandra-password}" | base64 -d` - Run this code: ``` package main import ( "fmt" "math/rand/v2" "strconv" "sync" "sync/atomic" "time" "github.com/gocql/gocql" ) const workers = 100 const queries = 10_000 const readWorkers = 100 const readQueries = 10_000_000 // helm install --namespace cassandra --create-namespace cassandra bitnami/cassandra --set "replicaCount=6" // kubectl expose service cassandra --type=NodePort --name=cassandra-external --namespace=cassandra func main() { //host from docker inspect kind-control-plane | grep IPAddress //port from kubectl get services --namespace=cassandra | grep external cluster := gocql.NewCluster("172.18.0.2:32146") cluster.Authenticator = gocql.PasswordAuthenticator{ Username: "cassandra", //from kubectl get secret --namespace "cassandra" cassandra -o jsonpath="{.data.cassandra-password}" | base64 -d Password: "kMDfXIiu5M", } session, err := cluster.CreateSession() if err != nil { panic(err) } execRelease(session.Query("drop keyspace if exists k8stest")) execRelease(session.Query("create keyspace k8stest with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3}")) execRelease(session.Query("drop table if exists k8stest.test")) execRelease(session.Query("create table k8stest.test (a int, b text, primary key(a))")) var wg sync.WaitGroup for i := 0; i <= workers; i++ { wg.Add(1) go func() { defer wg.Done() query := session.Query("insert into k8stest.test (a, b) values (?,?)") for j := i * queries; j < (i+1)*queries; j++ { query.Bind(j, "Message"+strconv.Itoa(j)) if err := query.Exec(); err != nil { panic(err) } } query.Release() }() } wg.Wait() var scans uint64 var errors uint64 var mu sync.RWMutex erorsCount := make(map[string]uint64) for i := 0; i <= readWorkers; i++ { wg.Add(1) go func() { defer wg.Done() query := session.Query("select b from k8stest.test where a=?") for j := i * readQueries; j < (i+1)*readQueries; j++ { id := rand.IntN(queries * workers) query.Bind(id) iter := query.Iter() var val string if iter.Scan(&val) { if val != "Message"+strconv.Itoa(id) { panic("unexpected message " + val + "instead of Message" + strconv.Itoa(id)) } atomic.AddUint64(&scans, 1) } else { atomic.AddUint64(&errors, 1) time.Sleep(time.Millisecond) } if err := iter.Close(); err != nil { mu.Lock() erorsCount[err.Error()]++ mu.Unlock() query.Release() query = session.Query("select b from k8stest.test where a=?") } } query.Release() }() } go func() { for { time.Sleep(time.Second) fmt.Printf("##### %d scans, %d errors\n", scans, errors) mu.RLock() for err, count := range erorsCount { fmt.Printf("error %s count %d \n", err, count) } mu.RUnlock() } }() wg.Wait() } func execRelease(query *gocql.Query) { if err := query.Exec(); err != nil { println(err.Error()) panic(err) } query.Release() } ``` - After reading starts: ``` ##### 4494 scans, 0 errors ##### 11271 scans, 0 errors ##### 17814 scans, 0 errors ##### 23093 scans, 0 errors ... ``` I remove one pod from the statefulset - Get a bit of errors and reading continues ``` ##### 373219 scans, 685 errors error Server is shutting down count 635 error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50 ``` - However after cassandra node joins the ring session breaks and I receive only errors: ``` ##### 546274 scans, 685 errors error Server is shutting down count 635 error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50 ##### 553261 scans, 685 errors error Server is shutting down count 635 error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50 ##### 556674 scans, 41433 errors error gocql: connection closed waiting for response count 101 error gocql: no hosts available in the pool count 40547 error Server is shutting down count 635 error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50 ##### 556674 scans, 121771 errors error gocql: connection closed waiting for response count 101 error gocql: no hosts available in the pool count 120903 error Server is shutting down count 635 error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50 ... ##### 556674 scans, 10370862 errors error Server is shutting down count 635 error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50 error gocql: connection closed waiting for response count 101 error gocql: no hosts available in the pool count 10369976 ``` I expect gocql.Session to continue serving queries. It seems that this reaction is equal for removing and waiting for recreation of any node in cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org