isopov opened a new issue, #1884:
URL: https://github.com/apache/cassandra-gocql-driver/issues/1884

   Sorry for the lengthy reproduction (maybe it is possible to make it smaller 
and have more control)
   What I do:
   - Install bitnami/cassandra to my local k8s (I use kind) `helm install 
--namespace cassandra --create-namespace cassandra bitnami/cassandra --set 
"replicaCount=6"`
   - Expose it to connect from localhost `kubectl expose service cassandra 
--type=NodePort --name=cassandra-external --namespace=cassandra`
   - Get host for connection with `docker inspect kind-control-plane | grep 
IPAddress`
   - Get port for connection with `kubectl get services --namespace=cassandra | 
grep external`
   - Get password with `kubectl get secret --namespace "cassandra" cassandra -o 
jsonpath="{.data.cassandra-password}" | base64 -d`
   - Run this code:
   ```
   package main
   
   import (
        "fmt"
        "math/rand/v2"
        "strconv"
        "sync"
        "sync/atomic"
        "time"
   
        "github.com/gocql/gocql"
   )
   
   const workers = 100
   const queries = 10_000
   
   const readWorkers = 100
   const readQueries = 10_000_000
   
   // helm install --namespace cassandra --create-namespace cassandra 
bitnami/cassandra --set "replicaCount=6"
   // kubectl expose service cassandra --type=NodePort 
--name=cassandra-external --namespace=cassandra
   func main() {
        //host from docker inspect kind-control-plane | grep IPAddress
        //port from kubectl get services --namespace=cassandra | grep external
        cluster := gocql.NewCluster("172.18.0.2:32146")
        cluster.Authenticator = gocql.PasswordAuthenticator{
                Username: "cassandra",
                //from kubectl get secret --namespace "cassandra" cassandra -o 
jsonpath="{.data.cassandra-password}" | base64 -d
                Password: "kMDfXIiu5M",
        }
        session, err := cluster.CreateSession()
        if err != nil {
                panic(err)
        }
   
        execRelease(session.Query("drop keyspace if exists k8stest"))
        execRelease(session.Query("create keyspace k8stest with replication = 
{'class' : 'SimpleStrategy', 'replication_factor' : 3}"))
        execRelease(session.Query("drop table if exists k8stest.test"))
        execRelease(session.Query("create table k8stest.test (a int, b text, 
primary key(a))"))
   
        var wg sync.WaitGroup
   
        for i := 0; i <= workers; i++ {
                wg.Add(1)
   
                go func() {
                        defer wg.Done()
                        query := session.Query("insert into k8stest.test (a, b) 
values (?,?)")
                        for j := i * queries; j < (i+1)*queries; j++ {
                                query.Bind(j, "Message"+strconv.Itoa(j))
                                if err := query.Exec(); err != nil {
                                        panic(err)
                                }
                        }
                        query.Release()
                }()
        }
   
        wg.Wait()
   
        var scans uint64
        var errors uint64
   
        var mu sync.RWMutex
        erorsCount := make(map[string]uint64)
   
        for i := 0; i <= readWorkers; i++ {
                wg.Add(1)
   
                go func() {
                        defer wg.Done()
                        query := session.Query("select b from k8stest.test 
where a=?")
                        for j := i * readQueries; j < (i+1)*readQueries; j++ {
                                id := rand.IntN(queries * workers)
                                query.Bind(id)
                                iter := query.Iter()
                                var val string
                                if iter.Scan(&val) {
                                        if val != "Message"+strconv.Itoa(id) {
                                                panic("unexpected message " + 
val + "instead of Message" + strconv.Itoa(id))
                                        }
                                        atomic.AddUint64(&scans, 1)
                                } else {
                                        atomic.AddUint64(&errors, 1)
                                        time.Sleep(time.Millisecond)
                                }
                                if err := iter.Close(); err != nil {
                                        mu.Lock()
                                        erorsCount[err.Error()]++
                                        mu.Unlock()
                                        query.Release()
                                        query = session.Query("select b from 
k8stest.test where a=?")
                                }
   
                        }
                        query.Release()
                }()
        }
   
        go func() {
                for {
                        time.Sleep(time.Second)
                        fmt.Printf("##### %d scans, %d errors\n", scans, errors)
                        mu.RLock()
                        for err, count := range erorsCount {
                                fmt.Printf("error %s count %d \n", err, count)
                        }
                        mu.RUnlock()
                }
        }()
   
        wg.Wait()
   }
   
   func execRelease(query *gocql.Query) {
        if err := query.Exec(); err != nil {
                println(err.Error())
                panic(err)
        }
        query.Release()
   }
   ```
   - After reading starts:
   ```
   ##### 4494 scans, 0 errors
   ##### 11271 scans, 0 errors
   ##### 17814 scans, 0 errors
   ##### 23093 scans, 0 errors
   ...
   ```
   I remove one pod from the statefulset 
   - Get a bit of errors and reading continues
   ```
   ##### 373219 scans, 685 errors
   error Server is shutting down count 635 
   error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe 
count 50
   ```
   - However after cassandra node joins the ring session breaks and I receive 
only errors:
   ```
   ##### 546274 scans, 685 errors
   error Server is shutting down count 635 
   error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe 
count 50 
   ##### 553261 scans, 685 errors
   error Server is shutting down count 635 
   error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe 
count 50 
   ##### 556674 scans, 41433 errors
   error gocql: connection closed waiting for response count 101 
   error gocql: no hosts available in the pool count 40547 
   error Server is shutting down count 635 
   error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe 
count 50 
   ##### 556674 scans, 121771 errors
   error gocql: connection closed waiting for response count 101 
   error gocql: no hosts available in the pool count 120903 
   error Server is shutting down count 635 
   error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe 
count 50 
   ...
   ##### 556674 scans, 10370862 errors
   error Server is shutting down count 635 
   error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe 
count 50 
   error gocql: connection closed waiting for response count 101 
   error gocql: no hosts available in the pool count 10369976
   ```
   
   I expect gocql.Session to continue serving queries. It seems that this 
reaction is equal for removing and waiting for recreation of any node in 
cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to