This is an automated email from the ASF dual-hosted git repository.
joaoreis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-gocql-driver.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3c46f52a Introduce configurable schema metadata caching and fix
token-aware routing for multi-keyspace workloads
3c46f52a is described below
commit 3c46f52a62e38098b6d49f82415cd730e0329ac3
Author: Raj <[email protected]>
AuthorDate: Tue Feb 24 13:19:45 2026 -0800
Introduce configurable schema metadata caching and fix token-aware routing
for multi-keyspace workloads
TokenAwareHostPolicy only maintained replica maps for the default keyspace,
causing queries to other keyspaces to route only to primary replicas, leading
to hotspotting and poor load distribution.
This change introduces MetadataCacheMode to control schema caching (Full,
KeyspaceOnly, or Disabled) with asynchronous refresh via refreshDebouncer.
TokenAwareHostPolicy now maintains replica maps for all keyspaces and updates
them on schema and topology changes through a notifier interface, ensuring
correct token-aware routing across multi-keyspace workloads.
Patch by Raj Ummadisetty; reviewed by joao-r-reis, worryg0d for CASSGO-104,
CASSGO-107
---
CHANGELOG.md | 2 +
cassandra_test.go | 536 ++++++++++++++++++++++++++---------
cluster.go | 33 +++
common_test.go | 7 +-
conn.go | 6 +-
control.go | 13 +
events.go | 27 +-
host_source.go | 4 +
metadata.go | 832 +++++++++++++++++++++++++++++++++++++++++++++---------
policies.go | 108 +++++--
policies_test.go | 462 ++++++++++++++++++++++++------
session.go | 26 +-
topology.go | 84 +++++-
topology_test.go | 36 +--
14 files changed, 1742 insertions(+), 434 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5398820c..fc14c3de 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
- Session.StatementMetadata (CASSGO-92)
- NewLogFieldIP, NewLogFieldError, NewLogFieldStringer, NewLogFieldString,
NewLogFieldInt, NewLogFieldBool (CASSGO-92)
+- Introduced configurable schema metadata caching modes to control what
metadata is cached (CASSGO-107)
### Fixed
@@ -20,6 +21,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
- Use protocol downgrading approach during protocol negotiation (CASSGO-97)
- Prevent panic iin compileMetadata() when final func is not defined for an
aggregate (CASSGO-105)
- Framer drops error silently (CASSGO-108)
+- TokenAwareHostPolicy now populates replica maps for non-default keyspaces
(CASSGO-104)
## [2.0.0]
diff --git a/cassandra_test.go b/cassandra_test.go
index 937a7c05..7665e17d 100644
--- a/cassandra_test.go
+++ b/cassandra_test.go
@@ -2230,12 +2230,21 @@ func TestEmptyTimestamp(t *testing.T) {
}
}
-// Integration test of just querying for data from the system.schema_keyspace
table where the keyspace DOES exist.
+// Integration test of querying for data from the system.schema_keyspace table
for single and all the keyspaces.
func TestGetKeyspaceMetadata(t *testing.T) {
session := createSession(t)
defer session.Close()
+ t.Run("SingleKeyspace", func(t *testing.T) {
+ keyspaceMetadata, err := getKeyspaceMetadata(session,
"gocql_test")
+ assertGetKeyspaceMetadata(t, keyspaceMetadata, err)
+ })
+ t.Run("AllKeyspaces", func(t *testing.T) {
+ keyspacesMetadata, err := getAllKeyspaceMetadata(session)
+ assertGetKeyspaceMetadata(t, keyspacesMetadata["gocql_test"],
err)
+ })
+}
- keyspaceMetadata, err := getKeyspaceMetadata(session, "gocql_test")
+func assertGetKeyspaceMetadata(t *testing.T, keyspaceMetadata
*KeyspaceMetadata, err error) {
if err != nil {
t.Fatalf("failed to query the keyspace metadata with err: %v",
err)
}
@@ -2276,16 +2285,25 @@ func TestGetKeyspaceMetadataFails(t *testing.T) {
}
}
-// Integration test of just querying for data from the
system.schema_columnfamilies table
-func TestGetTableMetadata(t *testing.T) {
+// Integration test of querying for table data from the
system.schema_columnfamilies table for single keyspace and all keyspaces
+func TestGetAllTableMetadata(t *testing.T) {
session := createSession(t)
defer session.Close()
-
if err := createTable(session, "CREATE TABLE
gocql_test.test_table_metadata (first_id int, second_id int, third_id int,
PRIMARY KEY (first_id, second_id))"); err != nil {
t.Fatalf("failed to create table with error '%v'", err)
}
+ t.Run("SingleKeyspace", func(t *testing.T) {
+ tables, err := getTableMetadata(session, "gocql_test")
+ assertGetTableMetadata(t, session, tables, err)
+ })
+ t.Run("AllKeyspaces", func(t *testing.T) {
+ tables, err := getAllTablesMetadata(session)
+ assertGetTableMetadata(t, session, tables["gocql_test"], err)
+ })
+}
+
+func assertGetTableMetadata(t *testing.T, session *Session, tables
[]TableMetadata, err error) {
- tables, err := getTableMetadata(session, "gocql_test")
if err != nil {
t.Fatalf("failed to query the table metadata with err: %v", err)
}
@@ -2360,7 +2378,18 @@ func TestGetColumnMetadata(t *testing.T) {
t.Fatalf("failed to create index with err: %v", err)
}
- columns, err := getColumnMetadata(session, "gocql_test")
+ t.Run("SingleKeyspace", func(t *testing.T) {
+ columns, err := getColumnMetadata(session, "gocql_test")
+ assertGetColumnMetadata(t, session, columns, err)
+ })
+
+ t.Run("AllKeyspaces", func(t *testing.T) {
+ columns, err := getAllColumnMetadata(session)
+ assertGetColumnMetadata(t, session, columns["gocql_test"], err)
+ })
+}
+
+func assertGetColumnMetadata(t *testing.T, session *Session, columns
[]ColumnMetadata, err error) {
if err != nil {
t.Fatalf("failed to query column metadata with err: %v", err)
}
@@ -2457,8 +2486,17 @@ func TestMaterializedViewMetadata(t *testing.T) {
session := createSession(t)
defer session.Close()
createMaterializedViews(t, session)
+ t.Run("SingleKeyspace", func(t *testing.T) {
+ materializedViews, err := getMaterializedViewsMetadata(session,
"gocql_test")
+ assertMaterializedViewMetadata(t, materializedViews, err)
+ })
+ t.Run("AllKeyspaces", func(t *testing.T) {
+ materializedViews, err :=
getAllMaterializedViewsMetadata(session)
+ assertMaterializedViewMetadata(t,
materializedViews["gocql_test"], err)
+ })
+}
- materializedViews, err := getMaterializedViewsMetadata(session,
"gocql_test")
+func assertMaterializedViewMetadata(t *testing.T, materializedViews
[]MaterializedViewMetadata, err error) {
if err != nil {
t.Fatalf("failed to query view metadata with err: %v", err)
}
@@ -2542,8 +2580,17 @@ func TestAggregateMetadata(t *testing.T) {
session := createSession(t)
defer session.Close()
createAggregate(t, session)
+ t.Run("SingleKeyspace", func(t *testing.T) {
+ aggregates, err := getAggregatesMetadata(session, "gocql_test")
+ assertAggregateMetadata(t, aggregates, err)
+ })
+ t.Run("AllKeyspaces", func(t *testing.T) {
+ aggregates, err := getAllAggregatesMetadata(session)
+ assertAggregateMetadata(t, aggregates["gocql_test"], err)
+ })
+}
- aggregates, err := getAggregatesMetadata(session, "gocql_test")
+func assertAggregateMetadata(t *testing.T, aggregates []AggregateMetadata, err
error) {
if err != nil {
t.Fatalf("failed to query aggregate metadata with err: %v", err)
}
@@ -2590,8 +2637,18 @@ func TestFunctionMetadata(t *testing.T) {
session := createSession(t)
defer session.Close()
createFunctions(t, session)
+ t.Run("SingleKeyspace", func(t *testing.T) {
+ functions, err := getFunctionsMetadata(session, "gocql_test")
+ assertFunctionMetadata(t, functions, err)
+ })
+ t.Run("AllKeyspaces", func(t *testing.T) {
+ functions, err := getAllFunctionsMetadata(session)
+ assertFunctionMetadata(t, functions["gocql_test"], err)
+ })
+
+}
- functions, err := getFunctionsMetadata(session, "gocql_test")
+func assertFunctionMetadata(t *testing.T, functions []FunctionMetadata, err
error) {
if err != nil {
t.Fatalf("failed to query function metadata with err: %v", err)
}
@@ -2661,8 +2718,31 @@ func TestFunctionMetadata(t *testing.T) {
}
}
-// Integration test of querying and composition the keyspace metadata
+// Integration test of querying keyspace metadata with different
MetadataCacheMode settings
func TestKeyspaceMetadata(t *testing.T) {
+ testCases := []struct {
+ name string
+ cacheMode MetadataCacheMode
+ // When true, expect full metadata (tables, aggregates, views,
types)
+ // When false, only expect keyspace-level metadata
+ expectFullMetadata bool
+ }{
+ {
+ name: "Full",
+ cacheMode: Full,
+ expectFullMetadata: true,
+ },
+ {
+ name: "KeyspaceOnly",
+ cacheMode: KeyspaceOnly,
+ expectFullMetadata: false,
+ },
+ {
+ name: "Disabled",
+ cacheMode: Disabled,
+ expectFullMetadata: true,
+ },
+ }
session := createSession(t)
defer session.Close()
@@ -2677,119 +2757,156 @@ func TestKeyspaceMetadata(t *testing.T) {
t.Fatalf("failed to create index with err: %v", err)
}
- keyspaceMetadata, err := session.KeyspaceMetadata("gocql_test")
- if err != nil {
- t.Fatalf("failed to query keyspace metadata with err: %v", err)
- }
- if keyspaceMetadata == nil {
- t.Fatal("expected the keyspace metadata to not be nil, but it
was nil")
- }
- if keyspaceMetadata.Name != session.cfg.Keyspace {
- t.Fatalf("Expected the keyspace name to be %s but was %s",
session.cfg.Keyspace, keyspaceMetadata.Name)
- }
- if len(keyspaceMetadata.Tables) == 0 {
- t.Errorf("Expected tables but there were none")
- }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ session := createSession(t, func(config *ClusterConfig)
{
+ config.MetadataCacheMode = tc.cacheMode
+ })
+ defer session.Close()
+ // Query keyspace metadata
+ keyspaceMetadata, err :=
session.KeyspaceMetadata("gocql_test")
+ if err != nil {
+ t.Fatalf("failed to query keyspace metadata
with err: %v", err)
+ }
+ if keyspaceMetadata == nil {
+ t.Fatal("expected the keyspace metadata to not
be nil, but it was nil")
+ }
+ if keyspaceMetadata.Name != session.cfg.Keyspace {
+ t.Fatalf("Expected the keyspace name to be %s
but was %s", session.cfg.Keyspace, keyspaceMetadata.Name)
+ }
+ // When cache mode is Disabled, verify that the cache
is empty
+ if tc.cacheMode == Disabled {
+ cachedMeta :=
session.schemaDescriber.getSchemaMetaForRead()
+ if cachedMeta != nil &&
len(cachedMeta.keyspaceMeta) > 0 {
+ t.Errorf("Expected empty cache in
Disabled mode, but found %d keyspaces cached", len(cachedMeta.keyspaceMeta))
+ }
+ }
- tableMetadata, found := keyspaceMetadata.Tables["test_metadata"]
- if !found {
- t.Fatalf("failed to find the test_metadata table metadata")
- }
+ if tc.expectFullMetadata {
+ if len(keyspaceMetadata.Tables) == 0 {
+ t.Errorf("Expected tables but there
were none")
+ }
- if len(tableMetadata.PartitionKey) != 1 {
- t.Errorf("expected partition key length of 1, but was %d",
len(tableMetadata.PartitionKey))
- }
- for i, column := range tableMetadata.PartitionKey {
- if column == nil {
- t.Errorf("partition key column metadata at index %d was
nil", i)
- }
- }
- if tableMetadata.PartitionKey[0].Name != "first_id" {
- t.Errorf("Expected the first partition key column to be
'first_id' but was '%s'", tableMetadata.PartitionKey[0].Name)
- }
- if len(tableMetadata.ClusteringColumns) != 1 {
- t.Fatalf("expected clustering columns length of 1, but was %d",
len(tableMetadata.ClusteringColumns))
- }
- for i, column := range tableMetadata.ClusteringColumns {
- if column == nil {
- t.Fatalf("clustering column metadata at index %d was
nil", i)
- }
- }
- if tableMetadata.ClusteringColumns[0].Name != "second_id" {
- t.Errorf("Expected the first clustering column to be
'second_id' but was '%s'", tableMetadata.ClusteringColumns[0].Name)
- }
- thirdColumn, found := tableMetadata.Columns["third_id"]
- if !found {
- t.Fatalf("Expected a column definition for 'third_id'")
- }
- if !session.useSystemSchema && thirdColumn.Index.Name !=
"index_metadata" {
- // TODO(zariel): scan index info from system_schema
- t.Errorf("Expected column index named 'index_metadata' but was
'%s'", thirdColumn.Index.Name)
- }
+ tableMetadata, found :=
keyspaceMetadata.Tables["test_metadata"]
+ if !found {
+ t.Fatalf("failed to find the
test_metadata table metadata")
+ }
- aggregate, found := keyspaceMetadata.Aggregates["average"]
- if !found {
- t.Fatal("failed to find the aggregate 'average' in metadata")
- }
- if aggregate.FinalFunc.Name != "avgfinal" {
- t.Fatalf("expected final function %s, but got %s", "avgFinal",
aggregate.FinalFunc.Name)
- }
- if aggregate.StateFunc.Name != "avgstate" {
- t.Fatalf("expected state function %s, but got %s", "avgstate",
aggregate.StateFunc.Name)
- }
- aggregate, found = keyspaceMetadata.Aggregates["average2"]
- if !found {
- t.Fatal("failed to find the aggregate 'average2' in metadata")
- }
- if aggregate.FinalFunc.Name != "avgfinal" {
- t.Fatalf("expected final function %s, but got %s", "avgFinal",
aggregate.FinalFunc.Name)
- }
- if aggregate.StateFunc.Name != "avgstate" {
- t.Fatalf("expected state function %s, but got %s", "avgstate",
aggregate.StateFunc.Name)
- }
- _, found = keyspaceMetadata.UserTypes["basicview"]
- if !found {
- t.Fatal("failed to find the types in metadata")
- }
- textType := TypeText
- if flagCassVersion.Before(3, 0, 0) {
- textType = TypeVarchar
- }
- expectedType := UserTypeMetadata{
- Keyspace: "gocql_test",
- Name: "basicview",
- FieldNames: []string{"birthday", "nationality", "weight",
"height"},
- FieldTypes: []TypeInfo{
- timestampTypeInfo{},
- varcharLikeTypeInfo{
- typ: textType,
- },
- varcharLikeTypeInfo{
- typ: textType,
- },
- varcharLikeTypeInfo{
- typ: textType,
- },
- },
- }
- if !reflect.DeepEqual(*keyspaceMetadata.UserTypes["basicview"],
expectedType) {
- t.Fatalf("type is %#v, but expected %#v",
keyspaceMetadata.UserTypes["basicview"], expectedType)
- }
- if flagCassVersion.Major >= 3 {
- materializedView, found :=
keyspaceMetadata.MaterializedViews["view_view"]
- if !found {
- t.Fatal("failed to find materialized view view_view in
metadata")
- }
- if materializedView.BaseTable.Name != "view_table" {
- t.Fatalf("expected name: %s, materialized view base
table name: %s", "view_table", materializedView.BaseTable.Name)
- }
- materializedView, found =
keyspaceMetadata.MaterializedViews["view_view2"]
- if !found {
- t.Fatal("failed to find materialized view view_view2 in
metadata")
- }
- if materializedView.BaseTable.Name != "view_table2" {
- t.Fatalf("expected name: %s, materialized view base
table name: %s", "view_table2", materializedView.BaseTable.Name)
- }
+ if len(tableMetadata.PartitionKey) != 1 {
+ t.Errorf("expected partition key length
of 1, but was %d", len(tableMetadata.PartitionKey))
+ }
+ for i, column := range
tableMetadata.PartitionKey {
+ if column == nil {
+ t.Errorf("partition key column
metadata at index %d was nil", i)
+ }
+ }
+ if tableMetadata.PartitionKey[0].Name !=
"first_id" {
+ t.Errorf("Expected the first partition
key column to be 'first_id' but was '%s'", tableMetadata.PartitionKey[0].Name)
+ }
+ if len(tableMetadata.ClusteringColumns) != 1 {
+ t.Fatalf("expected clustering columns
length of 1, but was %d", len(tableMetadata.ClusteringColumns))
+ }
+ for i, column := range
tableMetadata.ClusteringColumns {
+ if column == nil {
+ t.Fatalf("clustering column
metadata at index %d was nil", i)
+ }
+ }
+ if tableMetadata.ClusteringColumns[0].Name !=
"second_id" {
+ t.Errorf("Expected the first clustering
column to be 'second_id' but was '%s'", tableMetadata.ClusteringColumns[0].Name)
+ }
+ thirdColumn, found :=
tableMetadata.Columns["third_id"]
+ if !found {
+ t.Fatalf("Expected a column definition
for 'third_id'")
+ }
+ if !session.useSystemSchema &&
thirdColumn.Index.Name != "index_metadata" {
+ // TODO(zariel): scan index info from
system_schema
+ t.Errorf("Expected column index named
'index_metadata' but was '%s'", thirdColumn.Index.Name)
+ }
+
+ aggregate, found :=
keyspaceMetadata.Aggregates["average"]
+ if !found {
+ t.Fatal("failed to find the aggregate
'average' in metadata")
+ }
+ if aggregate.FinalFunc.Name != "avgfinal" {
+ t.Fatalf("expected final function %s,
but got %s", "avgFinal", aggregate.FinalFunc.Name)
+ }
+ if aggregate.StateFunc.Name != "avgstate" {
+ t.Fatalf("expected state function %s,
but got %s", "avgstate", aggregate.StateFunc.Name)
+ }
+ aggregate, found =
keyspaceMetadata.Aggregates["average2"]
+ if !found {
+ t.Fatal("failed to find the aggregate
'average2' in metadata")
+ }
+ if aggregate.FinalFunc.Name != "avgfinal" {
+ t.Fatalf("expected final function %s,
but got %s", "avgFinal", aggregate.FinalFunc.Name)
+ }
+ if aggregate.StateFunc.Name != "avgstate" {
+ t.Fatalf("expected state function %s,
but got %s", "avgstate", aggregate.StateFunc.Name)
+ }
+ _, found =
keyspaceMetadata.UserTypes["basicview"]
+ if !found {
+ t.Fatal("failed to find the types in
metadata")
+ }
+ textType := TypeText
+ if flagCassVersion.Before(3, 0, 0) {
+ textType = TypeVarchar
+ }
+ expectedType := UserTypeMetadata{
+ Keyspace: "gocql_test",
+ Name: "basicview",
+ FieldNames: []string{"birthday",
"nationality", "weight", "height"},
+ FieldTypes: []TypeInfo{
+ timestampTypeInfo{},
+ varcharLikeTypeInfo{
+ typ: textType,
+ },
+ varcharLikeTypeInfo{
+ typ: textType,
+ },
+ varcharLikeTypeInfo{
+ typ: textType,
+ },
+ },
+ }
+ if
!reflect.DeepEqual(*keyspaceMetadata.UserTypes["basicview"], expectedType) {
+ t.Fatalf("type is %#v, but expected
%#v", keyspaceMetadata.UserTypes["basicview"], expectedType)
+ }
+ if flagCassVersion.Major >= 3 {
+ materializedView, found :=
keyspaceMetadata.MaterializedViews["view_view"]
+ if !found {
+ t.Fatal("failed to find
materialized view view_view in metadata")
+ }
+ if materializedView.BaseTable.Name !=
"view_table" {
+ t.Fatalf("expected name: %s,
materialized view base table name: %s", "view_table",
materializedView.BaseTable.Name)
+ }
+ materializedView, found =
keyspaceMetadata.MaterializedViews["view_view2"]
+ if !found {
+ t.Fatal("failed to find
materialized view view_view2 in metadata")
+ }
+ if materializedView.BaseTable.Name !=
"view_table2" {
+ t.Fatalf("expected name: %s,
materialized view base table name: %s", "view_table2",
materializedView.BaseTable.Name)
+ }
+ }
+ } else {
+ // KeyspaceOnly mode should only return
keyspace metadata
+ // Tables, Functions, Aggregates,
MaterializedViews, UserTypes should be nil
+ if keyspaceMetadata.Tables != nil {
+ t.Errorf("Expected no tables in
KeyspaceOnly mode, but got %d tables", len(keyspaceMetadata.Tables))
+ }
+ if keyspaceMetadata.Aggregates != nil {
+ t.Errorf("Expected no aggregates in
KeyspaceOnly mode, but got %d aggregates", len(keyspaceMetadata.Aggregates))
+ }
+ if keyspaceMetadata.Functions != nil {
+ t.Errorf("Expected no functions in
KeyspaceOnly mode, but got %d functions", len(keyspaceMetadata.Functions))
+ }
+ if keyspaceMetadata.UserTypes != nil {
+ t.Errorf("Expected no user types in
KeyspaceOnly mode, but got %d types", len(keyspaceMetadata.UserTypes))
+ }
+ if keyspaceMetadata.MaterializedViews != nil {
+ t.Errorf("Expected no materialized
views in KeyspaceOnly mode, but got %d views",
len(keyspaceMetadata.MaterializedViews))
+ }
+ }
+ })
}
}
@@ -2920,14 +3037,16 @@ func TestRoutingStatementMetadata(t *testing.T) {
// Integration test of the token-aware policy-based connection pool
func TestTokenAwareConnPool(t *testing.T) {
cluster := createCluster()
- cluster.PoolConfig.HostSelectionPolicy =
TokenAwareHostPolicy(RoundRobinHostPolicy())
+ // Create a dedicated keyspace with RF=1 for deterministic token-aware
routing
+ createKeyspaceWithRF(t, cluster, "test_token_aware_ks", 1)
+ cluster.PoolConfig.HostSelectionPolicy =
TokenAwareHostPolicy(RoundRobinHostPolicy())
+ cluster.Logger = NewLogger(LogLevelDebug)
// force metadata query to page
cluster.PageSize = 1
session := createSessionFromCluster(cluster, t)
defer session.Close()
-
expectedPoolSize := cluster.NumConns * len(session.ring.allHosts())
// wait for pool to fill
@@ -2943,25 +3062,180 @@ func TestTokenAwareConnPool(t *testing.T) {
}
// add another cf so there are two pages when fetching table metadata
from our keyspace
- if err := createTable(session, "CREATE TABLE
gocql_test.test_token_aware_other_cf (id int, data text, PRIMARY KEY (id))");
err != nil {
- t.Fatalf("failed to create test_token_aware table with err:
%v", err)
+ if err := createTable(session, "CREATE TABLE
test_token_aware_ks.test_token_aware_other_cf (id int, data text, PRIMARY KEY
(id))"); err != nil {
+ t.Fatalf("failed to create test_token_aware_other_cf table with
err: %v", err)
}
- if err := createTable(session, "CREATE TABLE
gocql_test.test_token_aware (id int, data text, PRIMARY KEY (id))"); err != nil
{
+ if err := createTable(session, "CREATE TABLE
test_token_aware_ks.test_token_aware (id int, data text, PRIMARY KEY (id))");
err != nil {
t.Fatalf("failed to create test_token_aware table with err:
%v", err)
}
- query := session.Query("INSERT INTO test_token_aware (id, data) VALUES
(?,?)", 42, "8 * 6 =")
+ query := session.Query("INSERT INTO
test_token_aware_ks.test_token_aware (id, data) VALUES (?,?)", 42, "8 * 6 =")
if err := query.Exec(); err != nil {
t.Fatalf("failed to insert with err: %v", err)
}
- query = session.Query("SELECT data FROM test_token_aware where id = ?",
42).Consistency(One)
+ // Verify token-aware routing using tracing: queries with the same
partition key should
+ // consistently go to the same coordinator with no hops to other nodes
+ type traceCapture struct {
+ coordinator string
+ sources []string
+ }
+
+ var coordinators []string
+ var allSources [][]string
var data string
- if err := query.Scan(&data); err != nil {
- t.Error(err)
+
+ // Execute the same query 5 times with the same partition key
+ for i := 0; i < 5; i++ {
+ var capturedTrace traceCapture
+ queryNum := i + 1
+ tracer := newTestTracer(session, func(coordinator string,
sources []string, err error) {
+ if err != nil {
+ t.Fatalf("query %d: failed to collect trace
data: %v", queryNum, err)
+ }
+ capturedTrace.coordinator = coordinator
+ capturedTrace.sources = sources
+ })
+
+ query = session.Query("SELECT data FROM
test_token_aware_ks.test_token_aware where id = ?", 42).
+ Consistency(One).
+ Trace(tracer)
+ if err := query.Scan(&data); err != nil {
+ t.Errorf("query %d failed: %v", queryNum, err)
+ }
+
+ coordinators = append(coordinators, capturedTrace.coordinator)
+ allSources = append(allSources, capturedTrace.sources)
+ }
+
+ if len(coordinators) != 5 {
+ t.Fatalf("expected 5 traced queries, got %d", len(coordinators))
}
- // TODO add verification that the query went to the correct host
+ // Verify all queries went to the same coordinator
+ firstCoordinator := coordinators[0]
+ for i, coord := range coordinators {
+ if coord != firstCoordinator {
+ t.Errorf("Token-aware routing failed: query %d went to
coordinator %s, but query 1 went to %s",
+ i+1, coord, firstCoordinator)
+ }
+ }
+
+ // Verify no hops for any query (all trace events should originate from
the coordinator)
+ for queryNum, sources := range allSources {
+ coordinator := coordinators[queryNum]
+ for eventNum, source := range sources {
+ if source != coordinator {
+ t.Errorf("Query %d trace event %d came from %s,
but coordinator is %s (indicates query was forwarded)",
+ queryNum+1, eventNum+1, source,
coordinator)
+ }
+ }
+ }
+}
+
+// testTracer is a custom tracer for testing that captures coordinator and
event sources
+type testTracer struct {
+ session *Session
+ onTrace func(coordinator string, sources []string, err error)
+ maxAttempts int // Number of retry attempts (default: 5)
+ retryDelay time.Duration // Delay between retries (default: 400ms)
+}
+
+// newTestTracer creates a new testTracer with default retry settings
+func newTestTracer(session *Session, onTrace func(coordinator string, sources
[]string, err error)) *testTracer {
+ return &testTracer{
+ session: session,
+ onTrace: onTrace,
+ maxAttempts: 5,
+ retryDelay: 400 * time.Millisecond,
+ }
+}
+
+func (t *testTracer) Trace(traceId []byte) {
+ var (
+ coordinator string
+ duration int
+ )
+
+ // Use configured retry parameters, or defaults if not set
+ maxAttempts := t.maxAttempts
+ if maxAttempts == 0 {
+ maxAttempts = 5 // default
+ }
+ retryDelay := t.retryDelay
+ if retryDelay == 0 {
+ retryDelay = 400 * time.Millisecond // default
+ }
+
+ var found bool
+ for attempt := 1; attempt <= maxAttempts; attempt++ {
+ iter := t.session.control.query(`SELECT coordinator, duration
+ FROM system_traces.sessions
+ WHERE session_id = ?`, traceId)
+
+ // Scan returns true if a row was found
+ found = iter.Scan(&coordinator, &duration)
+ if err := iter.Close(); err != nil {
+ if t.onTrace != nil {
+ t.onTrace("", nil, fmt.Errorf("failed to query
trace sessions: %w", err))
+ }
+ return
+ }
+
+ // If we got a row with duration > 0, the trace is complete and
all events are published
+ if found && duration > 0 {
+ break
+ }
+
+ // If not the last attempt, wait before retrying
+ if attempt < maxAttempts {
+ t.session.logger.Debug("Trace data not ready, retrying
after delay",
+ NewLogFieldString("retryDelay",
retryDelay.String()),
+ NewLogFieldInt("attempt", attempt),
+ NewLogFieldInt("maxAttempts", maxAttempts),
+ NewLogFieldBool("found", found),
+ NewLogFieldInt("duration", duration))
+ time.Sleep(retryDelay)
+ }
+ }
+
+ // If we still didn't find complete trace data after all attempts, call
callback with error
+ if !found || duration == 0 {
+ if t.onTrace != nil {
+ t.onTrace("", nil, fmt.Errorf("trace data not available
after %d attempts (found=%v, duration=%d)", maxAttempts, found, duration))
+ }
+ return
+ }
+
+ var sources []string
+ iter := t.session.control.query(`SELECT *
+ FROM system_traces.events
+ WHERE session_id = ?`, traceId)
+
+ results, err := iter.SliceMap()
+ if err != nil {
+ if t.onTrace != nil {
+ t.onTrace("", nil, fmt.Errorf("failed to read trace
events: %w", err))
+ }
+ return
+ }
+
+ t.session.logger.Debug("Got trace events.",
NewLogFieldString("results", fmt.Sprintf("%s", results)))
+
+ for _, row := range results {
+ sources = append(sources, row["source"].(net.IP).String())
+ }
+
+ if err := iter.Close(); err != nil {
+ if t.onTrace != nil {
+ t.onTrace("", nil, fmt.Errorf("failed to read trace
events: %w", err))
+ }
+ return
+ }
+
+ if t.onTrace != nil {
+ t.onTrace(coordinator, sources, nil)
+ }
}
func TestNegativeStream(t *testing.T) {
diff --git a/cluster.go b/cluster.go
index ebd4a186..fc556cd3 100644
--- a/cluster.go
+++ b/cluster.go
@@ -42,6 +42,36 @@ type PoolConfig struct {
HostSelectionPolicy HostSelectionPolicy
}
+// MetadataCacheMode controls how the driver reads and caches schema metadata
from Cassandra system tables.
+// This affects the behavior of Session.KeyspaceMetadata and token-aware host
selection policies.
+//
+// See the individual mode constants (Full, KeyspaceOnly, Disabled) for
detailed behavior of each mode.
+type MetadataCacheMode int
+
+const (
+ // Full mode reads and caches all schema metadata including keyspaces,
tables, columns,
+ // functions, aggregates, user-defined types, and materialized views.
+ //
+ // Token-aware routing works normally (if TokenAwareHostPolicy is used)
with full replica information.
+ // Session.KeyspaceMetadata returns cached metadata without querying
system tables.
+ Full MetadataCacheMode = iota
+
+ // KeyspaceOnly mode reads and caches only keyspace metadata
(replication strategy and options).
+ // This enables token-aware routing (if TokenAwareHostPolicy is used)
without the overhead of caching detailed schema information.
+ //
+ // Token-aware routing works normally (if TokenAwareHostPolicy is used)
with full replica information.
+ // Session.KeyspaceMetadata returns cached keyspace metadata, but
Tables, Functions, Aggregates,
+ // MaterializedViews, and UserTypes fields will be nil.
+ KeyspaceOnly
+
+ // Disabled mode completely disables schema metadata caching.
+ //
+ // Token-aware routing falls back to the configured fallback policy
(e.g., RoundRobinHostPolicy,
+ // DCAwareRoundRobinPolicy) since replica information is not available.
+ // Session.KeyspaceMetadata queries system tables on every call instead
of using a cache.
+ Disabled
+)
+
func (p PoolConfig) buildPool(session *Session) *policyConnPool {
return newPolicyConnPool(session)
}
@@ -286,6 +316,8 @@ type ClusterConfig struct {
// internal config for testing
disableControlConn bool
+
+ MetadataCacheMode MetadataCacheMode
}
// Dialer is the interface that wraps the DialContext method for establishing
network connections to Cassandra nodes.
@@ -325,6 +357,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries:
3, Interval: 1 * time.Second},
WriteCoalesceWaitTime: 200 * time.Microsecond,
NextPagePrefetch: 0.25,
+ MetadataCacheMode: Full,
}
return cfg
}
diff --git a/common_test.go b/common_test.go
index 299af3b3..0ae75015 100644
--- a/common_test.go
+++ b/common_test.go
@@ -139,6 +139,11 @@ func createCluster(opts ...func(*ClusterConfig))
*ClusterConfig {
}
func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
+ createKeyspaceWithRF(tb, cluster, keyspace, *flagRF)
+}
+
+// createKeyspaceWithRF creates a keyspace with a specific replication factor
+func createKeyspaceWithRF(tb testing.TB, cluster *ClusterConfig, keyspace
string, rf int) {
// TODO: tb.Helper()
c := *cluster
c.Keyspace = "system"
@@ -158,7 +163,7 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig,
keyspace string) {
WITH replication = {
'class' : 'SimpleStrategy',
'replication_factor' : %d
- }`, keyspace, *flagRF))
+ }`, keyspace, rf))
if err != nil {
panic(fmt.Sprintf("unable to create keyspace: %v", err))
diff --git a/conn.go b/conn.go
index a9bb4f5a..a615129d 100644
--- a/conn.go
+++ b/conn.go
@@ -1972,13 +1972,17 @@ func (c *Conn) querySystemLocal(ctx context.Context)
*Iter {
}
func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
+ return c.awaitSchemaAgreementWithTimeout(ctx,
c.session.cfg.MaxWaitSchemaAgreement)
+}
+
+func (c *Conn) awaitSchemaAgreementWithTimeout(ctx context.Context, timeout
time.Duration) (err error) {
const localSchemas = "SELECT schema_version FROM system.local WHERE
key='local'"
var versions map[string]struct{}
var schemaVersion string
var rows []map[string]interface{}
- endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement)
+ endDeadline := time.Now().Add(timeout)
for time.Now().Before(endDeadline) {
iter := c.querySystemPeers(ctx, c.host.version)
diff --git a/control.go b/control.go
index c518ba68..cc21e089 100644
--- a/control.go
+++ b/control.go
@@ -364,6 +364,13 @@ func (c *controlConn) setupConn(conn *Conn, sessionInit
bool) error {
NewLogFieldIP("host_addr", host.ConnectAddress()),
NewLogFieldString("host_id", host.HostID()))
if c.session.initialized() {
+ refreshErr := c.session.schemaDescriber.refreshSchemaMetadata()
+ if refreshErr != nil {
+ c.session.logger.Warning("Failed to refresh schema
metadata after reconnecting. "+
+ "Schema might be stale or missing, causing
token-aware routing to fall back to the configured fallback policy. "+
+ "Keyspace metadata queries might fail with
ErrKeyspaceDoesNotExist until schema refresh succeeds.",
+ NewLogFieldError("err", refreshErr))
+ }
// We connected to control conn, so add the connect the host in
pool as well.
// Notify session we can start trying to connect to the node.
// We can't start the fill before the session is initialized,
otherwise the fill would interfere
@@ -599,6 +606,12 @@ func (c *controlConn) awaitSchemaAgreement() error {
}).err
}
+func (c *controlConn) awaitSchemaAgreementWithTimeout(timeout time.Duration)
error {
+ return c.withConn(func(conn *Conn) *Iter {
+ return
newErrIter(conn.awaitSchemaAgreementWithTimeout(context.TODO(), timeout),
&queryMetrics{}, "", nil, nil)
+ }).err
+}
+
func (c *controlConn) close() {
if atomic.CompareAndSwapInt32(&c.state, controlConnStarted,
controlConnClosing) {
c.quit <- struct{}{}
diff --git a/events.go b/events.go
index d511d9ae..b2b3b1eb 100644
--- a/events.go
+++ b/events.go
@@ -122,8 +122,7 @@ func (s *Session) handleEvent(framer *framer) {
switch f := frame.(type) {
case *schemaChangeKeyspace, *schemaChangeFunction,
*schemaChangeTable, *schemaChangeAggregate, *schemaChangeType:
-
- s.schemaEvents.debounce(frame)
+ s.schemaDescriber.debounceRefreshSchemaMetadata()
case *topologyChangeEventFrame, *statusChangeEventFrame:
s.nodeEvents.debounce(frame)
default:
@@ -132,30 +131,6 @@ func (s *Session) handleEvent(framer *framer) {
}
}
-func (s *Session) handleSchemaEvent(frames []frame) {
- // TODO: debounce events
- for _, frame := range frames {
- switch f := frame.(type) {
- case *schemaChangeKeyspace:
- s.schemaDescriber.clearSchema(f.keyspace)
- s.handleKeyspaceChange(f.keyspace, f.change)
- case *schemaChangeTable:
- s.schemaDescriber.clearSchema(f.keyspace)
- case *schemaChangeAggregate:
- s.schemaDescriber.clearSchema(f.keyspace)
- case *schemaChangeFunction:
- s.schemaDescriber.clearSchema(f.keyspace)
- case *schemaChangeType:
- s.schemaDescriber.clearSchema(f.keyspace)
- }
- }
-}
-
-func (s *Session) handleKeyspaceChange(keyspace, change string) {
- s.control.awaitSchemaAgreement()
- s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace,
Change: change})
-}
-
// handleNodeEvent handles inbound status and topology change events.
//
// Status events are debounced by host IP; only the latest event is processed.
diff --git a/host_source.go b/host_source.go
index 622d2294..abffd2dc 100644
--- a/host_source.go
+++ b/host_source.go
@@ -907,6 +907,10 @@ const (
ringRefreshDebounceTime = 1 * time.Second
)
+const (
+ schemaRefreshDebounceTime = 1 * time.Second
+)
+
// debounces requests to call a refresh function (currently used for ring
refresh). It also supports triggering a refresh immediately.
type refreshDebouncer struct {
mu sync.Mutex
diff --git a/metadata.go b/metadata.go
index 7909f22d..08830304 100644
--- a/metadata.go
+++ b/metadata.go
@@ -31,9 +31,11 @@ package gocql
import (
"encoding/hex"
"encoding/json"
+ "errors"
"fmt"
"strings"
- "sync"
+ "sync/atomic"
+ "time"
)
// schema metadata for a keyspace
@@ -42,6 +44,7 @@ type KeyspaceMetadata struct {
DurableWrites bool
StrategyClass string
StrategyOptions map[string]interface{}
+ placementStrategy placementStrategy
Tables map[string]*TableMetadata
Functions map[string]*FunctionMetadata
Aggregates map[string]*AggregateMetadata
@@ -210,7 +213,7 @@ func (c *ColumnKind) UnmarshalCQL(typ TypeInfo, p []byte)
error {
kind, err := columnKindFromSchema(string(p))
if err != nil {
- return err
+ return fmt.Errorf("failed to parse column kind from schema:
%w", err)
}
*c = kind
@@ -243,95 +246,274 @@ const (
// queries the cluster for schema information for a specific keyspace
type schemaDescriber struct {
- session *Session
- mu sync.Mutex
+ session *Session
+ schemaRefresher *refreshDebouncer
+ schemaMeta atomic.Value // *schemaMeta
+}
+
+// Schema change type constants as defined in the Cassandra Native Protocol
specification.
+// These values indicate the nature of schema modifications that occurred.
+//
+// See:
https://cassandra.apache.org/doc/latest/cassandra/reference/native-protocol.html
+//
+// Schema change events are server-initiated messages sent to clients that
have registered
+// for schema change notifications. These events indicate modifications to
keyspaces, tables,
+// user-defined types, functions, or aggregates.
+const (
+ SchemaChangeTypeCreated = "CREATED" // Schema object was created
+ SchemaChangeTypeUpdated = "UPDATED" // Schema object was modified
+ SchemaChangeTypeDropped = "DROPPED" // Schema object was removed
+)
- cache map[string]*KeyspaceMetadata
+type schemaMeta struct {
+ keyspaceMeta map[string]*KeyspaceMetadata
}
// creates a session bound schema describer which will query and cache
// keyspace metadata
-func newSchemaDescriber(session *Session) *schemaDescriber {
- return &schemaDescriber{
+func newSchemaDescriber(session *Session, schemaRefresher *refreshDebouncer)
*schemaDescriber {
+ meta := new(schemaMeta)
+ describer := &schemaDescriber{
session: session,
- cache: map[string]*KeyspaceMetadata{},
}
+ describer.schemaMeta.Store(meta)
+ describer.schemaRefresher = schemaRefresher
+ return describer
+}
+
+func (s *schemaDescriber) getSchemaMetaForRead() *schemaMeta {
+ meta, _ := s.schemaMeta.Load().(*schemaMeta)
+ return meta
+}
+
+func (s *schemaDescriber) getSchemaMetaForUpdate() *schemaMeta {
+ meta := s.getSchemaMetaForRead()
+ metaNew := new(schemaMeta)
+ if meta != nil {
+ *metaNew = *meta
+ }
+ return metaNew
}
// returns the cached KeyspaceMetadata held by the describer for the named
// keyspace.
func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata,
error) {
- s.mu.Lock()
- defer s.mu.Unlock()
+ if s.session.cfg.MetadataCacheMode == Disabled {
+ return s.fetchSchema(keyspaceName)
+ }
+ metadata, found := s.getSchemaMetaForRead().keyspaceMeta[keyspaceName]
- metadata, found := s.cache[keyspaceName]
if !found {
- // refresh the cache for this keyspace
- err := s.refreshSchema(keyspaceName)
- if err != nil {
- return nil, err
- }
-
- metadata = s.cache[keyspaceName]
+ return nil, ErrKeyspaceDoesNotExist
}
return metadata, nil
}
-// clears the already cached keyspace metadata
-func (s *schemaDescriber) clearSchema(keyspaceName string) {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- delete(s.cache, keyspaceName)
-}
-
-// forcibly updates the current KeyspaceMetadata held by the schema describer
-// for a given named keyspace.
-func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
+func (s *schemaDescriber) fetchSchema(keyspaceName string) (*KeyspaceMetadata,
error) {
var err error
// query the system keyspace for schema data
// TODO retrieve concurrently
keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
tables, err := getTableMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
columns, err := getColumnMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
functions, err := getFunctionsMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
aggregates, err := getAggregatesMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
userTypes, err := getUserTypeMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
materializedViews, err := getMaterializedViewsMetadata(s.session,
keyspaceName)
if err != nil {
- return err
+ return nil, err
}
// organize the schema data
compileMetadata(s.session, keyspace, tables, columns, functions,
aggregates, userTypes,
materializedViews)
- // update the cache
- s.cache[keyspaceName] = keyspace
+ return keyspace, nil
+}
+
+// forcibly updates the current KeyspaceMetadata held by the schema describer
+// for all the keyspaces.
+// This function is called via schemaRefresher refreshDebouncer to batch and
+// debounce schema refresh requests.
+func refreshSchemas(session *Session) error {
+ start := time.Now()
+ var refreshErr error
+ defer func() {
+ elapsed := time.Since(start)
+ if refreshErr != nil {
+ session.logger.Debug("Schema refresh failed",
+ NewLogFieldString("duration", elapsed.String()),
+ NewLogFieldError("err", refreshErr))
+ } else {
+ session.logger.Debug("Schema refresh completed",
+ NewLogFieldString("duration", elapsed.String()))
+ }
+ }()
+
+ if session.cfg.MetadataCacheMode == Disabled {
+ return nil
+ }
+ awaitErr := session.control.awaitSchemaAgreementWithTimeout(10 *
time.Second)
+ if awaitErr != nil {
+ session.logger.Warning("Failed to await schema agreement,
proceeding with schema refresh",
+ NewLogFieldError("err", awaitErr))
+ }
+ var err error
+ var keyspaceMeta map[string]*KeyspaceMetadata
+ // query the system keyspace for schema data
+ keyspaceStart := time.Now()
+ keyspaces, err := getAllKeyspaceMetadata(session)
+ if err != nil {
+ refreshErr = fmt.Errorf("failed to retrieve keyspace metadata:
%w", err)
+ return refreshErr
+ }
+ keyspaceElapsed := time.Since(keyspaceStart)
+ session.logger.Debug("Keyspace metadata fetch completed",
+ NewLogFieldString("duration", keyspaceElapsed.String()))
+ var tables map[string][]TableMetadata
+ var columns map[string][]ColumnMetadata
+ var functions map[string][]FunctionMetadata
+ var aggregates map[string][]AggregateMetadata
+ var userTypes map[string][]UserTypeMetadata
+ var materializedViews map[string][]MaterializedViewMetadata
+ if session.cfg.MetadataCacheMode == Full {
+ tables, err = getAllTablesMetadata(session)
+ if err != nil {
+ refreshErr = fmt.Errorf("failed to retrieve table
metadata: %w", err)
+ return refreshErr
+ }
+ columns, err = getAllColumnMetadata(session)
+ if err != nil {
+ refreshErr = fmt.Errorf("failed to retrieve column
metadata: %w", err)
+ return refreshErr
+ }
+ functions, err = getAllFunctionsMetadata(session)
+ if err != nil {
+ refreshErr = fmt.Errorf("failed to retrieve function
metadata: %w", err)
+ return refreshErr
+ }
+ aggregates, err = getAllAggregatesMetadata(session)
+ if err != nil {
+ refreshErr = fmt.Errorf("failed to retrieve aggregate
metadata: %w", err)
+ return refreshErr
+ }
+ userTypes, err = getAllUserTypeMetadata(session)
+ if err != nil {
+ refreshErr = fmt.Errorf("failed to retrieve user type
metadata: %w", err)
+ return refreshErr
+ }
+ materializedViews, err =
getAllMaterializedViewsMetadata(session)
+ if err != nil {
+ refreshErr = fmt.Errorf("failed to retrieve
materialized view metadata: %w", err)
+ return refreshErr
+ }
+ }
+
+ // organize the schema data
+ keyspaceMeta = make(map[string]*KeyspaceMetadata)
+ sd := session.schemaDescriber
+ meta := sd.getSchemaMetaForUpdate()
+ oldKeyspaceMeta := meta.keyspaceMeta
+ var newKeyspaces []string
+ var updatedKeyspaces []string
+ var droppedKeyspaces []string
+ for keyspaceName, keyspace := range keyspaces {
+ if session.cfg.MetadataCacheMode == Full {
+ compileMetadata(session,
+ keyspace,
+ tables[keyspaceName],
+ columns[keyspaceName],
+ functions[keyspaceName],
+ aggregates[keyspaceName],
+ userTypes[keyspaceName],
+ materializedViews[keyspaceName])
+ }
+ // update the cache
+ keyspaceMeta[keyspaceName] = keyspace
+ if _, ok := oldKeyspaceMeta[keyspaceName]; !ok {
+ newKeyspaces = append(newKeyspaces, keyspaceName)
+ } else {
+ newStrat := keyspace.placementStrategy
+ oldStrat :=
oldKeyspaceMeta[keyspaceName].placementStrategy
+ if (newStrat == nil) != (oldStrat == nil) {
+ updatedKeyspaces = append(updatedKeyspaces,
keyspaceName)
+ } else if newStrat != nil && newStrat.strategyKey() !=
oldStrat.strategyKey() {
+ updatedKeyspaces = append(updatedKeyspaces,
keyspaceName)
+ }
+ }
+ }
+ droppedKeyspaces = sd.getDroppedKeyspaces(oldKeyspaceMeta, keyspaces)
+ meta.keyspaceMeta = keyspaceMeta
+ sd.schemaMeta.Store(meta)
+ refreshCacheElapsed := time.Since(start)
+ session.logger.Debug("Schema metadata cache refresh completed",
+ NewLogFieldString("duration", refreshCacheElapsed.String()))
+ // Notify policy if it supports schema refresh notifications
+ if notifier, ok := session.policy.(schemaRefreshNotifier); ok {
+ notifier.schemaRefreshed(sd.getSchemaMetaForRead())
+ } else {
+ for _, createdKeyspace := range newKeyspaces {
+
session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: createdKeyspace,
Change: SchemaChangeTypeCreated})
+ }
+ for _, droppedKeyspace := range droppedKeyspaces {
+
session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: droppedKeyspace,
Change: SchemaChangeTypeDropped})
+ }
+ for _, updatedKeyspace := range updatedKeyspaces {
+
session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: updatedKeyspace,
Change: SchemaChangeTypeUpdated})
+ }
+ }
+ return nil
+}
+
+func (s *schemaDescriber) debounceRefreshSchemaMetadata() {
+ s.schemaRefresher.debounce()
+}
+
+func (s *schemaDescriber) refreshSchemaMetadata() error {
+ err, ok := <-s.schemaRefresher.refreshNow()
+ if !ok {
+ return errors.New("could not refresh the schema because stop
was requested")
+ }
+
+ if err != nil {
+ return fmt.Errorf("failed to refresh schema metadata: %w", err)
+ }
return nil
}
+// getDroppedKeyspaces returns the list of keyspace names that existed in
oldKeyspaces
+// but do not exist in newKeyspaces (i.e., keyspaces that were dropped).
+func (s *schemaDescriber) getDroppedKeyspaces(oldKeyspaces, newKeyspaces
map[string]*KeyspaceMetadata) []string {
+ var dropped []string
+ for keyspaceName := range oldKeyspaces {
+ if _, exists := newKeyspaces[keyspaceName]; !exists {
+ dropped = append(dropped, keyspaceName)
+ }
+ }
+ return dropped
+}
+
// "compiles" derived information about keyspace, table, and column metadata
// for a keyspace from the basic queried metadata objects returned by
// getKeyspaceMetadata, getTableMetadata, and getColumnMetadata respectively;
@@ -398,7 +580,7 @@ func compileMetadata(
} else {
validatorParsed, err := parseType(session,
col.Validator)
if err != nil {
- return err
+ return fmt.Errorf("failed to parse column
validator type for column %s.%s: %w", col.Table, col.Name, err)
}
col.Type = validatorParsed.types[0]
col.Order = ASC
@@ -434,7 +616,7 @@ func compileV2Metadata(tables []TableMetadata, session
*Session) error {
if table.KeyValidator != "" {
keyValidatorParsed, err := parseType(session,
table.KeyValidator)
if err != nil {
- return err
+ return fmt.Errorf("failed to parse key
validator type for table %s.%s: %w", table.Keyspace, table.Name, err)
}
table.PartitionKey = make([]*ColumnMetadata,
len(keyValidatorParsed.types))
} else { // Cassandra 3.x+
@@ -467,62 +649,115 @@ func componentColumnCountOfType(columns
map[string]*ColumnMetadata, kind ColumnK
// query only for the keyspace metadata for the specified keyspace from
system.schema_keyspace
func getKeyspaceMetadata(session *Session, keyspaceName string)
(*KeyspaceMetadata, error) {
- keyspace := &KeyspaceMetadata{Name: keyspaceName}
+ var stmt string
if session.useSystemSchema { // Cassandra 3.x+
- const stmt = `
- SELECT durable_writes, replication
+ stmt = `
+ SELECT keyspace_name, durable_writes, replication
FROM system_schema.keyspaces
WHERE keyspace_name = ?`
- var replication map[string]string
-
- iter := session.control.query(stmt, keyspaceName)
- if iter.NumRows() == 0 {
- return nil, ErrKeyspaceDoesNotExist
- }
- iter.Scan(&keyspace.DurableWrites, &replication)
- err := iter.Close()
- if err != nil {
- return nil, fmt.Errorf("error querying keyspace schema:
%v", err)
- }
+ } else {
+ stmt = `
+ SELECT keyspace_name, durable_writes, strategy_class,
strategy_options
+ FROM system.schema_keyspaces
+ WHERE keyspace_name = ?`
+ }
+ iter := session.control.query(stmt, keyspaceName)
+ if iter.NumRows() == 0 {
+ return nil, ErrKeyspaceDoesNotExist
+ }
+ keyspaces, err := getKeyspaceMetadataFromIter(session, iter)
+ if err != nil {
+ return nil, err
+ }
+ if err := iter.Close(); err != nil {
+ return nil, fmt.Errorf("error querying keyspaces schema: %v",
err)
+ }
+ return keyspaces[keyspaceName], nil
+}
- keyspace.StrategyClass = replication["class"]
- delete(replication, "class")
+// query for the keyspace metadata for all keyspaces from
system.schema_keyspaces
+func getAllKeyspaceMetadata(session *Session) (map[string]*KeyspaceMetadata,
error) {
+ var stmt string
+ if session.useSystemSchema { // Cassandra 3.x+
+ stmt = `
+ SELECT keyspace_name, durable_writes, replication
+ FROM system_schema.keyspaces`
- keyspace.StrategyOptions = make(map[string]interface{},
len(replication))
- for k, v := range replication {
- keyspace.StrategyOptions[k] = v
- }
} else {
+ stmt = `
+ SELECT keyspace_name, durable_writes, strategy_class,
strategy_options
+ FROM system.schema_keyspaces`
+ }
+ iter := session.control.query(stmt)
- const stmt = `
- SELECT durable_writes, strategy_class, strategy_options
- FROM system.schema_keyspaces
- WHERE keyspace_name = ?`
+ keyspaces, err := getKeyspaceMetadataFromIter(session, iter)
+ if err != nil {
+ return nil, err
+ }
+ if err := iter.Close(); err != nil {
+ return nil, fmt.Errorf("error querying keyspaces schema: %v",
err)
+ }
- var strategyOptionsJSON []byte
+ return keyspaces, nil
+}
+
+func getKeyspaceMetadataFromIter(session *Session, iter *Iter)
(map[string]*KeyspaceMetadata, error) {
+ keyspaces := make(map[string]*KeyspaceMetadata)
+ if session.useSystemSchema { // Cassandra 3.x+
+
+ var (
+ keyspaceName string
+ durableWrites bool
+ replication map[string]string
+ )
+
+ for iter.Scan(&keyspaceName, &durableWrites, &replication) {
+ keyspace := &KeyspaceMetadata{
+ Name: keyspaceName,
+ DurableWrites: durableWrites,
+ }
+
+ keyspace.StrategyClass = replication["class"]
+ delete(replication, "class")
+
+ keyspace.StrategyOptions = make(map[string]interface{},
len(replication))
+ for k, v := range replication {
+ keyspace.StrategyOptions[k] = v
+ }
+ keyspace.placementStrategy = getStrategy(keyspace,
session.logger)
+ keyspaces[keyspaceName] = keyspace
- iter := session.control.query(stmt, keyspaceName)
- if iter.NumRows() == 0 {
- return nil, ErrKeyspaceDoesNotExist
- }
- iter.Scan(&keyspace.DurableWrites, &keyspace.StrategyClass,
&strategyOptionsJSON)
- err := iter.Close()
- if err != nil {
- return nil, fmt.Errorf("error querying keyspace schema:
%v", err)
}
+ } else {
+ var (
+ keyspaceName string
+ durableWrites bool
+ strategyClass string
+ strategyOptionsJSON []byte
+ )
+ for iter.Scan(&keyspaceName, &durableWrites, &strategyClass,
&strategyOptionsJSON) {
+ keyspace := &KeyspaceMetadata{
+ Name: keyspaceName,
+ DurableWrites: durableWrites,
+ StrategyClass: strategyClass,
+ }
- err = json.Unmarshal(strategyOptionsJSON,
&keyspace.StrategyOptions)
- if err != nil {
- return nil, fmt.Errorf(
- "invalid JSON value '%s' as strategy_options
for in keyspace '%s': %v",
- strategyOptionsJSON, keyspace.Name, err,
- )
+ err := json.Unmarshal(strategyOptionsJSON,
&keyspace.StrategyOptions)
+ if err != nil {
+ return nil, fmt.Errorf(
+ "invalid JSON value '%s' as
strategy_options for in keyspace '%s': %v",
+ strategyOptionsJSON, keyspace.Name, err,
+ )
+ }
+
+ keyspace.placementStrategy = getStrategy(keyspace,
session.logger)
+ keyspaces[keyspaceName] = keyspace
}
}
- return keyspace, nil
+ return keyspaces, nil
}
// query for only the table metadata in the specified keyspace from
system.schema_columnfamilies
@@ -603,11 +838,81 @@ func getTableMetadata(session *Session, keyspaceName
string) ([]TableMetadata, e
return tables, nil
}
+// query for the table metadata for all tables from
system.schema_columnfamilies
+func getAllTablesMetadata(session *Session) (map[string][]TableMetadata,
error) {
+ var (
+ iter *Iter
+ scan func(iter *Iter, table *TableMetadata) bool
+ stmt string
+ )
+ if session.useSystemSchema { // Cassandra 3.x+
+ stmt = `
+ SELECT
+ keyspace_name, table_name
+ FROM system_schema.tables`
+
+ switchIter := func() *Iter {
+ iter.Close()
+ stmt = `
+ SELECT
+ keyspace_name, view_name
+ FROM system_schema.views`
+ iter = session.control.query(stmt)
+ return iter
+ }
+ scan = func(iter *Iter, table *TableMetadata) bool {
+ r := iter.Scan(
+ &table.Keyspace,
+ &table.Name,
+ )
+ if !r {
+ iter = switchIter()
+ if iter != nil {
+ switchIter = func() *Iter { return nil }
+ r = iter.Scan(&table.Keyspace,
&table.Name)
+ }
+ }
+ return r
+ }
+ } else {
+ stmt = `
+ SELECT
+ keyspace_name,
+ columnfamily_name,
+ key_validator,
+ comparator,
+ default_validator
+ FROM system.schema_columnfamilies`
+
+ scan = func(iter *Iter, table *TableMetadata) bool {
+ return iter.Scan(
+ &table.Keyspace,
+ &table.Name,
+ &table.KeyValidator,
+ &table.Comparator,
+ &table.DefaultValidator,
+ )
+ }
+ }
+ iter = session.control.query(stmt)
+ tablesByKeyspace := make(map[string][]TableMetadata)
+ table := TableMetadata{}
+ for scan(iter, &table) {
+ tablesByKeyspace[table.Keyspace] =
append(tablesByKeyspace[table.Keyspace], table)
+ table = TableMetadata{}
+ }
+ err := iter.Close()
+ if err != nil && err != ErrNotFound {
+ return nil, fmt.Errorf("error querying table schema: %v", err)
+ }
+
+ return tablesByKeyspace, nil
+}
+
func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata,
error) {
- // V1 does not support the type column, and all returned rows are
- // of kind "regular".
const stmt = `
SELECT
+ keyspace_name,
columnfamily_name,
column_name,
component_index,
@@ -617,20 +922,49 @@ func (s *Session) scanColumnMetadataV1(keyspace string)
([]ColumnMetadata, error
index_options
FROM system.schema_columns
WHERE keyspace_name = ?`
+ itr := s.control.query(stmt, keyspace)
+ columns, err := scanColumnMetadataV1FromIter(itr)
+ if err != nil {
+ return nil, err
+ }
- var columns []ColumnMetadata
+ return columns[keyspace], nil
+}
- rows := s.control.query(stmt, keyspace).Scanner()
+func (s *Session) scanAllColumnMetadataV1() (map[string][]ColumnMetadata,
error) {
+ const stmt = `
+ SELECT
+ keyspace_name,
+ columnfamily_name,
+ column_name,
+ component_index,
+ validator,
+ index_name,
+ index_type,
+ index_options
+ FROM system.schema_columns`
+ itr := s.control.query(stmt)
+ return scanColumnMetadataV1FromIter(itr)
+}
+
+func scanColumnMetadataV1FromIter(iter *Iter) (map[string][]ColumnMetadata,
error) {
+ // V1 does not support the type column, and all returned rows are
+ // of kind "regular".
+
+ var columns = make(map[string][]ColumnMetadata)
+
+ rows := iter.Scanner()
for rows.Next() {
var (
- column = ColumnMetadata{Keyspace: keyspace}
+ column = ColumnMetadata{}
indexOptionsJSON []byte
)
// all columns returned by V1 are regular
column.Kind = ColumnRegular
- err := rows.Scan(&column.Table,
+ err := rows.Scan(&column.Keyspace,
+ &column.Table,
&column.Name,
&column.ComponentIndex,
&column.Validator,
@@ -654,7 +988,7 @@ func (s *Session) scanColumnMetadataV1(keyspace string)
([]ColumnMetadata, error
}
}
- columns = append(columns, column)
+ columns[column.Keyspace] = append(columns[column.Keyspace],
column)
}
if err := rows.Err(); err != nil {
@@ -668,6 +1002,7 @@ func (s *Session) scanColumnMetadataV2(keyspace string)
([]ColumnMetadata, error
// V2+ supports the type column
const stmt = `
SELECT
+ keyspace_name,
columnfamily_name,
column_name,
component_index,
@@ -679,16 +1014,48 @@ func (s *Session) scanColumnMetadataV2(keyspace string)
([]ColumnMetadata, error
FROM system.schema_columns
WHERE keyspace_name = ?`
- var columns []ColumnMetadata
+ iter := s.control.query(stmt, keyspace)
+ columns, err := scanColumnMetadataV2FromIter(iter)
+ if err != nil {
+ return nil, err
+ }
+ return columns[keyspace], nil
+}
+
+func (s *Session) scanAllColumnMetadataV2() (map[string][]ColumnMetadata,
error) {
+ // V2+ supports the type column
+ const stmt = `
+ SELECT
+ keyspace_name,
+ columnfamily_name,
+ column_name,
+ component_index,
+ validator,
+ index_name,
+ index_type,
+ index_options,
+ type
+ FROM system.schema_columns`
+
+ iter := s.control.query(stmt)
+ columns, err := scanColumnMetadataV2FromIter(iter)
+ if err != nil {
+ return nil, err
+ }
+ return columns, nil
+}
- rows := s.control.query(stmt, keyspace).Scanner()
+func scanColumnMetadataV2FromIter(iter *Iter) (map[string][]ColumnMetadata,
error) {
+ rows := iter.Scanner()
+ columns := make(map[string][]ColumnMetadata)
for rows.Next() {
var (
- column = ColumnMetadata{Keyspace: keyspace}
+ column = ColumnMetadata{}
indexOptionsJSON []byte
)
- err := rows.Scan(&column.Table,
+ err := rows.Scan(&column.Keyspace,
+ &column.Table,
&column.Name,
&column.ComponentIndex,
&column.Validator,
@@ -697,11 +1064,9 @@ func (s *Session) scanColumnMetadataV2(keyspace string)
([]ColumnMetadata, error
&indexOptionsJSON,
&column.Kind,
)
-
if err != nil {
return nil, err
}
-
if len(indexOptionsJSON) > 0 {
err := json.Unmarshal(indexOptionsJSON,
&column.Index.Options)
if err != nil {
@@ -713,21 +1078,19 @@ func (s *Session) scanColumnMetadataV2(keyspace string)
([]ColumnMetadata, error
err)
}
}
-
- columns = append(columns, column)
+ columns[column.Keyspace] = append(columns[column.Keyspace],
column)
}
-
if err := rows.Err(); err != nil {
return nil, err
}
return columns, nil
-
}
func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata,
error) {
const stmt = `
SELECT
+ keyspace_name,
table_name,
column_name,
clustering_order,
@@ -737,13 +1100,50 @@ func (s *Session) scanColumnMetadataSystem(keyspace
string) ([]ColumnMetadata, e
FROM system_schema.columns
WHERE keyspace_name = ?`
- var columns []ColumnMetadata
+ var iter = s.control.query(stmt, keyspace)
+
+ columns, err := scanColumnMetadataSystemFromIter(iter)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return columns[keyspace], nil
+}
+
+func (s *Session) scanAllColumnMetadataSystem() (map[string][]ColumnMetadata,
error) {
+ const stmt = `
+ SELECT
+ keyspace_name,
+ table_name,
+ column_name,
+ clustering_order,
+ type,
+ kind,
+ position
+ FROM system_schema.columns`
+
+ var iter = s.control.query(stmt)
- rows := s.control.query(stmt, keyspace).Scanner()
+ columns, err := scanColumnMetadataSystemFromIter(iter)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return columns, nil
+}
+
+func scanColumnMetadataSystemFromIter(iter *Iter)
(map[string][]ColumnMetadata, error) {
+
+ var columns = make(map[string][]ColumnMetadata)
+
+ rows := iter.Scanner()
for rows.Next() {
- column := ColumnMetadata{Keyspace: keyspace}
+ column := ColumnMetadata{}
- err := rows.Scan(&column.Table,
+ err := rows.Scan(&column.Keyspace,
+ &column.Table,
&column.Name,
&column.ClusteringOrder,
&column.Validator,
@@ -755,7 +1155,7 @@ func (s *Session) scanColumnMetadataSystem(keyspace
string) ([]ColumnMetadata, e
return nil, err
}
- columns = append(columns, column)
+ columns[column.Keyspace] = append(columns[column.Keyspace],
column)
}
if err := rows.Err(); err != nil {
@@ -790,6 +1190,29 @@ func getColumnMetadata(session *Session, keyspaceName
string) ([]ColumnMetadata,
return columns, nil
}
+// query for the column metadata for all keyspaces from system.schema_columns
+func getAllColumnMetadata(session *Session) (map[string][]ColumnMetadata,
error) {
+ var (
+ columns map[string][]ColumnMetadata
+ err error
+ )
+
+ // Deal with differences in protocol versions
+ if session.cfg.ProtoVersion == 1 {
+ columns, err = session.scanAllColumnMetadataV1()
+ } else if session.useSystemSchema { // Cassandra 3.x+
+ columns, err = session.scanAllColumnMetadataSystem()
+ } else {
+ columns, err = session.scanAllColumnMetadataV2()
+ }
+
+ if err != nil && err != ErrNotFound {
+ return nil, fmt.Errorf("error querying column schema: %v", err)
+ }
+
+ return columns, nil
+}
+
func getUserTypeMetadata(session *Session, keyspaceName string)
([]UserTypeMetadata, error) {
var tableName string
if session.useSystemSchema {
@@ -799,19 +1222,56 @@ func getUserTypeMetadata(session *Session, keyspaceName
string) ([]UserTypeMetad
}
stmt := fmt.Sprintf(`
SELECT
+ keyspace_name,
type_name,
field_names,
field_types
FROM %s
WHERE keyspace_name = ?`, tableName)
- var uTypes []UserTypeMetadata
+ iter := session.control.query(stmt, keyspaceName)
+ uTypes, err := getUserTypeMetadataFromIter(session, iter)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return uTypes[keyspaceName], nil
+}
+
+func getAllUserTypeMetadata(session *Session) (map[string][]UserTypeMetadata,
error) {
+ var tableName string
+ if session.useSystemSchema {
+ tableName = "system_schema.types"
+ } else {
+ tableName = "system.schema_usertypes"
+ }
+ stmt := fmt.Sprintf(`
+ SELECT
+ keyspace_name,
+ type_name,
+ field_names,
+ field_types
+ FROM %s`, tableName)
+
+ iter := session.control.query(stmt)
+ uTypes, err := getUserTypeMetadataFromIter(session, iter)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return uTypes, nil
+}
- rows := session.control.query(stmt, keyspaceName).Scanner()
+func getUserTypeMetadataFromIter(session *Session, iter *Iter)
(map[string][]UserTypeMetadata, error) {
+ uTypes := make(map[string][]UserTypeMetadata)
+ rows := iter.Scanner()
for rows.Next() {
- uType := UserTypeMetadata{Keyspace: keyspaceName}
+ uType := UserTypeMetadata{}
var argumentTypes []string
- err := rows.Scan(&uType.Name,
+ err := rows.Scan(&uType.Keyspace,
+ &uType.Name,
&uType.FieldNames,
&argumentTypes,
)
@@ -827,7 +1287,7 @@ func getUserTypeMetadata(session *Session, keyspaceName
string) ([]UserTypeMetad
uType.FieldTypes[i] =
unknownTypeInfo(argumentType)
}
}
- uTypes = append(uTypes, uType)
+ uTypes[uType.Keyspace] = append(uTypes[uType.Keyspace], uType)
}
// TODO: if a UDT refers to another UDT, should we resolve it?
@@ -997,23 +1457,25 @@ func materializedViewMetadataFromMap(currentObject
map[string]interface{}, mater
return nil
}
-func parseSystemSchemaViews(iter *Iter) ([]MaterializedViewMetadata, error) {
- var materializedViews []MaterializedViewMetadata
- s, err := iter.SliceMap()
- if err != nil {
- return nil, err
- }
-
- for _, row := range s {
+func parseSystemSchemaViews(iter *Iter)
(map[string][]MaterializedViewMetadata, error) {
+ var materializedViews = make(map[string][]MaterializedViewMetadata)
+ numCols := len(iter.Columns())
+ for {
+ row := make(map[string]interface{}, numCols)
+ if !iter.MapScan(row) {
+ break
+ }
var materializedView MaterializedViewMetadata
- err = materializedViewMetadataFromMap(row, &materializedView)
+ err := materializedViewMetadataFromMap(row, &materializedView)
if err != nil {
return nil, err
}
- materializedViews = append(materializedViews, materializedView)
+ materializedViews[materializedView.Keyspace] =
append(materializedViews[materializedView.Keyspace], materializedView)
+ }
+ if iter.err != nil {
+ return nil, iter.err
}
-
return materializedViews, nil
}
@@ -1027,7 +1489,7 @@ func getMaterializedViewsMetadata(session *Session,
keyspaceName string) ([]Mate
FROM %s
WHERE keyspace_name = ?`, tableName)
- var materializedViews []MaterializedViewMetadata
+ var materializedViews map[string][]MaterializedViewMetadata
iter := session.control.query(stmt, keyspaceName)
@@ -1036,6 +1498,27 @@ func getMaterializedViewsMetadata(session *Session,
keyspaceName string) ([]Mate
return nil, err
}
+ return materializedViews[keyspaceName], nil
+}
+
+func getAllMaterializedViewsMetadata(session *Session)
(map[string][]MaterializedViewMetadata, error) {
+ if !session.useSystemSchema {
+ return nil, nil
+ }
+ var tableName = "system_schema.views"
+ stmt := fmt.Sprintf(`
+ SELECT *
+ FROM %s`, tableName)
+
+ var materializedViews map[string][]MaterializedViewMetadata
+
+ iter := session.control.query(stmt)
+
+ materializedViews, err := parseSystemSchemaViews(iter)
+ if err != nil {
+ return nil, err
+ }
+
return materializedViews, nil
}
@@ -1051,6 +1534,7 @@ func getFunctionsMetadata(session *Session, keyspaceName
string) ([]FunctionMeta
}
stmt := fmt.Sprintf(`
SELECT
+ keyspace_name,
function_name,
argument_types,
argument_names,
@@ -1061,14 +1545,57 @@ func getFunctionsMetadata(session *Session,
keyspaceName string) ([]FunctionMeta
FROM %s
WHERE keyspace_name = ?`, tableName)
- var functions []FunctionMetadata
+ iter := session.control.query(stmt, keyspaceName)
+ functions, err := getFunctionsMetadataFromIter(session, iter)
- rows := session.control.query(stmt, keyspaceName).Scanner()
+ if err != nil {
+ return nil, err
+ }
+
+ return functions[keyspaceName], nil
+}
+
+func getAllFunctionsMetadata(session *Session) (map[string][]FunctionMetadata,
error) {
+ if !session.hasAggregatesAndFunctions {
+ return nil, nil
+ }
+ var tableName string
+ if session.useSystemSchema {
+ tableName = "system_schema.functions"
+ } else {
+ tableName = "system.schema_functions"
+ }
+ stmt := fmt.Sprintf(`
+ SELECT
+ keyspace_name,
+ function_name,
+ argument_types,
+ argument_names,
+ body,
+ called_on_null_input,
+ language,
+ return_type
+ FROM %s`, tableName)
+
+ iter := session.control.query(stmt)
+ functions, err := getFunctionsMetadataFromIter(session, iter)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return functions, nil
+}
+
+func getFunctionsMetadataFromIter(session *Session, iter *Iter)
(map[string][]FunctionMetadata, error) {
+ functions := make(map[string][]FunctionMetadata)
+ rows := iter.Scanner()
for rows.Next() {
- function := FunctionMetadata{Keyspace: keyspaceName}
+ function := FunctionMetadata{}
var argumentTypes []string
var returnType string
- err := rows.Scan(&function.Name,
+ err := rows.Scan(&function.Keyspace,
+ &function.Name,
&argumentTypes,
&function.ArgumentNames,
&function.Body,
@@ -1094,13 +1621,11 @@ func getFunctionsMetadata(session *Session,
keyspaceName string) ([]FunctionMeta
function.ArgumentTypes[i] =
unknownTypeInfo(argumentType)
}
}
- functions = append(functions, function)
+ functions[function.Keyspace] =
append(functions[function.Keyspace], function)
}
-
if err := rows.Err(); err != nil {
return nil, err
}
-
return functions, nil
}
@@ -1117,6 +1642,7 @@ func getAggregatesMetadata(session *Session, keyspaceName
string) ([]AggregateMe
stmt := fmt.Sprintf(`
SELECT
+ keyspace_name,
aggregate_name,
argument_types,
final_func,
@@ -1127,15 +1653,59 @@ func getAggregatesMetadata(session *Session,
keyspaceName string) ([]AggregateMe
FROM %s
WHERE keyspace_name = ?`, tableName)
- var aggregates []AggregateMetadata
+ iter := session.control.query(stmt, keyspaceName)
+ aggregates, err := getAggregatesMetadataFromIter(session, iter)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return aggregates[keyspaceName], nil
+}
+
+func getAllAggregatesMetadata(session *Session)
(map[string][]AggregateMetadata, error) {
+ if !session.hasAggregatesAndFunctions {
+ return nil, nil
+ }
+ var tableName string
+ if session.useSystemSchema {
+ tableName = "system_schema.aggregates"
+ } else {
+ tableName = "system.schema_aggregates"
+ }
+
+ stmt := fmt.Sprintf(`
+ SELECT
+ keyspace_name,
+ aggregate_name,
+ argument_types,
+ final_func,
+ initcond,
+ return_type,
+ state_func,
+ state_type
+ FROM %s`, tableName)
+
+ iter := session.control.query(stmt)
+ aggregates, err := getAggregatesMetadataFromIter(session, iter)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return aggregates, nil
+}
- rows := session.control.query(stmt, keyspaceName).Scanner()
+func getAggregatesMetadataFromIter(session *Session, iter *Iter)
(map[string][]AggregateMetadata, error) {
+ aggregates := make(map[string][]AggregateMetadata)
+ rows := iter.Scanner()
for rows.Next() {
- aggregate := AggregateMetadata{Keyspace: keyspaceName}
+ aggregate := AggregateMetadata{}
var argumentTypes []string
var returnType string
var stateType string
- err := rows.Scan(&aggregate.Name,
+ err := rows.Scan(&aggregate.Keyspace,
+ &aggregate.Name,
&argumentTypes,
&aggregate.finalFunc,
&aggregate.InitCond,
@@ -1167,7 +1737,7 @@ func getAggregatesMetadata(session *Session, keyspaceName
string) ([]AggregateMe
aggregate.ArgumentTypes[i] =
unknownTypeInfo(argumentType)
}
}
- aggregates = append(aggregates, aggregate)
+ aggregates[aggregate.Keyspace] =
append(aggregates[aggregate.Keyspace], aggregate)
}
if err := rows.Err(); err != nil {
diff --git a/policies.go b/policies.go
index 5fc61d78..1736cd2b 100644
--- a/policies.go
+++ b/policies.go
@@ -324,6 +324,22 @@ type HostSelectionPolicy interface {
Pick(statement ExecutableStatement) NextHost
}
+// schemaRefreshNotifier is an optional interface that can be implemented by
HostSelectionPolicy
+// to receive notifications when schema metadata has been refreshed.
+//
+// When a policy implements this interface, it will receive the complete
schema metadata
+// via schemaRefreshed() instead of individual KeyspaceChanged() events for
each keyspace
+// that was created, dropped, or updated. This allows policies to perform
batch updates
+// or optimizations when processing schema changes.
+//
+// This is particularly useful for policies like TokenAwareHostPolicy that
need to update
+// their internal replica mappings based on keyspace metadata changes.
+type schemaRefreshNotifier interface {
+ // schemaRefreshed is called after schema metadata has been
successfully refreshed.
+ // The meta parameter contains the complete, updated schema metadata
for all keyspaces.
+ schemaRefreshed(meta *schemaMeta)
+}
+
// SelectedHost is an interface returned when picking a host from a host
// selection policy.
type SelectedHost interface {
@@ -414,7 +430,7 @@ func TokenAwareHostPolicy(fallback HostSelectionPolicy,
opts ...func(*tokenAware
// so fields should not be modified in-place. Instead, to modify a field a
copy of the field should be made
// and the pointer in clusterMeta updated to point to the new value.
type clusterMeta struct {
- // replicas is map[keyspace]map[token]hosts
+ // replicas is map[strategyKey]map[token]hosts
replicas map[string]tokenRingReplicas
tokenRing *tokenRing
}
@@ -423,6 +439,7 @@ type tokenAwareHostPolicy struct {
fallback HostSelectionPolicy
getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error)
getKeyspaceName func() string
+ getSchemaMeta func() *schemaMeta
shuffleReplicas bool
nonLocalReplicasFallback bool
@@ -447,6 +464,7 @@ func (t *tokenAwareHostPolicy) Init(s *Session) {
}
t.getKeyspaceMetadata = s.KeyspaceMetadata
t.getKeyspaceName = func() string { return s.cfg.Keyspace }
+ t.getSchemaMeta = s.schemaDescriber.getSchemaMetaForRead
t.logger = s.logger
}
@@ -455,35 +473,65 @@ func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo)
bool {
}
func (t *tokenAwareHostPolicy) KeyspaceChanged(update KeyspaceUpdateEvent) {
- t.mu.Lock()
- defer t.mu.Unlock()
- meta := t.getMetadataForUpdate()
- t.updateReplicas(meta, update.Keyspace)
- t.metadata.Store(meta)
+ if update.Change != SchemaChangeTypeDropped {
+ t.updateReplicas(update.Keyspace)
+ }
}
-// updateReplicas updates replicas in clusterMeta.
-// It must be called with t.mu mutex locked.
+// updateReplicas updates replicas in clusterMeta for keyspace schema changes.
// meta must not be nil and it's replicas field will be updated.
-func (t *tokenAwareHostPolicy) updateReplicas(meta *clusterMeta, keyspace
string) {
- newReplicas := make(map[string]tokenRingReplicas, len(meta.replicas))
-
+func (t *tokenAwareHostPolicy) updateReplicas(keyspace string) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ meta := t.getMetadataReadOnly()
ks, err := t.getKeyspaceMetadata(keyspace)
if err == nil {
- strat := getStrategy(ks, t.logger)
+ strat := ks.placementStrategy
if strat != nil {
if meta != nil && meta.tokenRing != nil {
- newReplicas[keyspace] =
strat.replicaMap(meta.tokenRing)
+ key := strat.strategyKey()
+ // Only add replica map if strategy key doesn't
exist yet.
+ // Multiple keyspaces with identical
replication strategies share the same replica map.
+ if _, ok := meta.replicas[key]; !ok {
+ metaUpdate := t.getMetadataForUpdate()
+ newReplicas :=
make(map[string]tokenRingReplicas, len(meta.replicas))
+ newReplicas[key] =
strat.replicaMap(metaUpdate.tokenRing)
+ for k, replicas := range
metaUpdate.replicas {
+ newReplicas[k] = replicas
+ }
+ metaUpdate.replicas = newReplicas
+ t.metadata.Store(metaUpdate)
+ }
}
}
}
+}
+
+func (t *tokenAwareHostPolicy) schemaRefreshed(schemaMeta *schemaMeta) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ meta := t.getMetadataForUpdate()
+ t.updateAllReplicas(meta, schemaMeta)
+ t.metadata.Store(meta)
+}
- for ks, replicas := range meta.replicas {
- if ks != keyspace {
- newReplicas[ks] = replicas
+// updateAllReplicas updates replicas in clusterMeta for schema/topology
changes.
+// It must be called with t.mu mutex locked.
+// meta must not be nil and it's replicas field will be updated.
+func (t *tokenAwareHostPolicy) updateAllReplicas(meta *clusterMeta, schemaMeta
*schemaMeta) {
+ schema := schemaMeta.keyspaceMeta
+ newReplicas := make(map[string]tokenRingReplicas, len(schema))
+ for _, metadata := range schema {
+ strat := metadata.placementStrategy
+ if strat != nil {
+ if meta != nil && meta.tokenRing != nil {
+ key := strat.strategyKey()
+ if _, ok := newReplicas[key]; !ok {
+ newReplicas[key] =
strat.replicaMap(meta.tokenRing)
+ }
+ }
}
}
-
meta.replicas = newReplicas
}
@@ -496,7 +544,9 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner
string) {
t.partitioner = partitioner
meta := t.getMetadataForUpdate()
meta.resetTokenRing(t.partitioner, t.hosts.get(), t.logger)
- t.updateReplicas(meta, t.getKeyspaceName())
+ if t.getSchemaMeta != nil {
+ t.updateAllReplicas(meta, t.getSchemaMeta())
+ }
t.metadata.Store(meta)
}
}
@@ -506,7 +556,9 @@ func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
if t.hosts.add(host) {
meta := t.getMetadataForUpdate()
meta.resetTokenRing(t.partitioner, t.hosts.get(), t.logger)
- t.updateReplicas(meta, t.getKeyspaceName())
+ if t.getSchemaMeta != nil {
+ t.updateAllReplicas(meta, t.getSchemaMeta())
+ }
t.metadata.Store(meta)
}
t.mu.Unlock()
@@ -523,7 +575,9 @@ func (t *tokenAwareHostPolicy) AddHosts(hosts []*HostInfo) {
meta := t.getMetadataForUpdate()
meta.resetTokenRing(t.partitioner, t.hosts.get(), t.logger)
- t.updateReplicas(meta, t.getKeyspaceName())
+ if t.getSchemaMeta != nil {
+ t.updateAllReplicas(meta, t.getSchemaMeta())
+ }
t.metadata.Store(meta)
t.mu.Unlock()
@@ -538,7 +592,9 @@ func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) {
if t.hosts.remove(host.ConnectAddress()) {
meta := t.getMetadataForUpdate()
meta.resetTokenRing(t.partitioner, t.hosts.get(), t.logger)
- t.updateReplicas(meta, t.getKeyspaceName())
+ if t.getSchemaMeta != nil {
+ t.updateAllReplicas(meta, t.getSchemaMeta())
+ }
t.metadata.Store(meta)
}
t.mu.Unlock()
@@ -612,7 +668,15 @@ func (t *tokenAwareHostPolicy) Pick(qry
ExecutableStatement) NextHost {
}
token := meta.tokenRing.partitioner.Hash(routingKey)
- ht := meta.replicas[qry.Keyspace()].replicasFor(token)
+ var ht *hostTokens
+ if t.getSchemaMeta != nil {
+ if ksMeta, ok :=
t.getSchemaMeta().keyspaceMeta[qry.Keyspace()]; ok {
+ strategy := ksMeta.placementStrategy
+ if strategy != nil {
+ ht =
meta.replicas[strategy.strategyKey()].replicasFor(token)
+ }
+ }
+ }
var replicas []*HostInfo
if ht == nil {
diff --git a/policies_test.go b/policies_test.go
index 62b3ad12..3dcf5c88 100644
--- a/policies_test.go
+++ b/policies_test.go
@@ -32,8 +32,6 @@
package gocql
import (
- "errors"
- "fmt"
"net"
"sort"
"strings"
@@ -75,10 +73,23 @@ func TestHostPolicy_TokenAware_SimpleStrategy(t *testing.T)
{
policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return keyspace }
- policyInternal.getKeyspaceMetadata = func(ks string)
(*KeyspaceMetadata, error) {
- return nil, errors.New("not initalized")
+ keyspaceMeta := &KeyspaceMetadata{
+ Name: keyspace,
+ StrategyClass: "SimpleStrategy",
+ StrategyOptions: map[string]interface{}{
+ "class": "SimpleStrategy",
+ "replication_factor": 2,
+ },
+ }
+ strategy := getStrategy(keyspaceMeta, nopLoggerSingleton)
+ keyspaceMeta.placementStrategy = strategy
+ policyInternal.getSchemaMeta = func() *schemaMeta {
+ return &schemaMeta{
+ keyspaceMeta: map[string]*KeyspaceMetadata{
+ keyspace: keyspaceMeta,
+ },
+ }
}
-
query := &Query{}
query.getKeyspace = func() string { return keyspace }
@@ -104,25 +115,10 @@ func TestHostPolicy_TokenAware_SimpleStrategy(t
*testing.T) {
policy.SetPartitioner("OrderedPartitioner")
- policyInternal.getKeyspaceMetadata = func(keyspaceName string)
(*KeyspaceMetadata, error) {
- if keyspaceName != keyspace {
- return nil, fmt.Errorf("unknown keyspace: %s",
keyspaceName)
- }
- return &KeyspaceMetadata{
- Name: keyspace,
- StrategyClass: "SimpleStrategy",
- StrategyOptions: map[string]interface{}{
- "class": "SimpleStrategy",
- "replication_factor": 2,
- },
- }, nil
- }
- policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace})
-
// The SimpleStrategy above should generate the following replicas.
// It's handy to have as reference here.
assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
- "myKeyspace": {
+ strategy.strategyKey(): {
{orderedToken("00"), []*HostInfo{hosts[0], hosts[1]}},
{orderedToken("25"), []*HostInfo{hosts[1], hosts[2]}},
{orderedToken("50"), []*HostInfo{hosts[2], hosts[3]}},
@@ -170,9 +166,6 @@ func TestHostPolicy_TokenAware_NilHostInfo(t *testing.T) {
policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return "myKeyspace" }
- policyInternal.getKeyspaceMetadata = func(ks string)
(*KeyspaceMetadata, error) {
- return nil, errors.New("not initialized")
- }
hosts := [...]*HostInfo{
{connectAddress: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}},
@@ -460,9 +453,6 @@ func TestHostPolicy_TokenAware(t *testing.T) {
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"))
policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return keyspace }
- policyInternal.getKeyspaceMetadata = func(ks string)
(*KeyspaceMetadata, error) {
- return nil, errors.New("not initialized")
- }
query := &Query{}
query.getKeyspace = func() string { return keyspace }
@@ -506,29 +496,32 @@ func TestHostPolicy_TokenAware(t *testing.T) {
t.Fatal("expected to get host from fallback got nil")
}
- policy.SetPartitioner("OrderedPartitioner")
-
- policyInternal.getKeyspaceMetadata = func(keyspaceName string)
(*KeyspaceMetadata, error) {
- if keyspaceName != keyspace {
- return nil, fmt.Errorf("unknown keyspace: %s",
keyspaceName)
- }
- return &KeyspaceMetadata{
- Name: keyspace,
- StrategyClass: "NetworkTopologyStrategy",
- StrategyOptions: map[string]interface{}{
- "class": "NetworkTopologyStrategy",
- "local": 1,
- "remote1": 1,
- "remote2": 1,
+ keyspaceMeta := &KeyspaceMetadata{
+ Name: keyspace,
+ StrategyClass: "NetworkTopologyStrategy",
+ StrategyOptions: map[string]interface{}{
+ "class": "NetworkTopologyStrategy",
+ "local": 1,
+ "remote1": 1,
+ "remote2": 1,
+ },
+ }
+ strategy := getStrategy(keyspaceMeta, nopLoggerSingleton)
+ keyspaceMeta.placementStrategy = strategy
+ policyInternal.getSchemaMeta = func() *schemaMeta {
+ return &schemaMeta{
+ keyspaceMeta: map[string]*KeyspaceMetadata{
+ keyspace: keyspaceMeta,
},
- }, nil
+ }
}
- policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
+
+ policy.SetPartitioner("OrderedPartitioner")
// The NetworkTopologyStrategy above should generate the following
replicas.
// It's handy to have as reference here.
assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
- "myKeyspace": {
+ strategy.strategyKey(): {
{orderedToken("05"), []*HostInfo{hosts[0], hosts[1],
hosts[2]}},
{orderedToken("10"), []*HostInfo{hosts[1], hosts[2],
hosts[3]}},
{orderedToken("15"), []*HostInfo{hosts[2], hosts[3],
hosts[4]}},
@@ -562,9 +555,6 @@ func TestHostPolicy_TokenAware_NetworkStrategy(t
*testing.T) {
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"),
NonLocalReplicasFallback())
policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return keyspace }
- policyInternal.getKeyspaceMetadata = func(ks string)
(*KeyspaceMetadata, error) {
- return nil, errors.New("not initialized")
- }
query := &Query{}
query.getKeyspace = func() string { return keyspace }
@@ -597,29 +587,32 @@ func TestHostPolicy_TokenAware_NetworkStrategy(t
*testing.T) {
policy.AddHost(host)
}
- policy.SetPartitioner("OrderedPartitioner")
-
- policyInternal.getKeyspaceMetadata = func(keyspaceName string)
(*KeyspaceMetadata, error) {
- if keyspaceName != keyspace {
- return nil, fmt.Errorf("unknown keyspace: %s",
keyspaceName)
- }
- return &KeyspaceMetadata{
- Name: keyspace,
- StrategyClass: "NetworkTopologyStrategy",
- StrategyOptions: map[string]interface{}{
- "class": "NetworkTopologyStrategy",
- "local": 2,
- "remote1": 2,
- "remote2": 2,
+ keyspaceMeta := &KeyspaceMetadata{
+ Name: keyspace,
+ StrategyClass: "NetworkTopologyStrategy",
+ StrategyOptions: map[string]interface{}{
+ "class": "NetworkTopologyStrategy",
+ "local": 2,
+ "remote1": 2,
+ "remote2": 2,
+ },
+ }
+ strategy := getStrategy(keyspaceMeta, nopLoggerSingleton)
+ keyspaceMeta.placementStrategy = strategy
+ policyInternal.getSchemaMeta = func() *schemaMeta {
+ return &schemaMeta{
+ keyspaceMeta: map[string]*KeyspaceMetadata{
+ keyspace: keyspaceMeta,
},
- }, nil
+ }
}
- policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace})
+
+ policy.SetPartitioner("OrderedPartitioner")
// The NetworkTopologyStrategy above should generate the following
replicas.
// It's handy to have as reference here.
assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
- keyspace: {
+ strategy.strategyKey(): {
{orderedToken("05"), []*HostInfo{hosts[0], hosts[1],
hosts[2], hosts[3], hosts[4], hosts[5]}},
{orderedToken("10"), []*HostInfo{hosts[1], hosts[2],
hosts[3], hosts[4], hosts[5], hosts[6]}},
{orderedToken("15"), []*HostInfo{hosts[2], hosts[3],
hosts[4], hosts[5], hosts[6], hosts[7]}},
@@ -685,9 +678,6 @@ func TestHostPolicy_TokenAware_RackAware(t *testing.T) {
policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return keyspace }
- policyInternal.getKeyspaceMetadata = func(ks string)
(*KeyspaceMetadata, error) {
- return nil, errors.New("not initialized")
- }
policyWithFallbackInternal := policyWithFallback.(*tokenAwareHostPolicy)
policyWithFallbackInternal.getKeyspaceName =
policyInternal.getKeyspaceName
@@ -736,31 +726,33 @@ func TestHostPolicy_TokenAware_RackAware(t *testing.T) {
t.Fatal("expected to get host from fallback got nil")
}
- policy.SetPartitioner("OrderedPartitioner")
- policyWithFallback.SetPartitioner("OrderedPartitioner")
-
- policyInternal.getKeyspaceMetadata = func(keyspaceName string)
(*KeyspaceMetadata, error) {
- if keyspaceName != keyspace {
- return nil, fmt.Errorf("unknown keyspace: %s",
keyspaceName)
- }
- return &KeyspaceMetadata{
- Name: keyspace,
- StrategyClass: "NetworkTopologyStrategy",
- StrategyOptions: map[string]interface{}{
- "class": "NetworkTopologyStrategy",
- "local": 2,
- "remote": 2,
+ keyspaceMeta := &KeyspaceMetadata{
+ Name: keyspace,
+ StrategyClass: "NetworkTopologyStrategy",
+ StrategyOptions: map[string]interface{}{
+ "class": "NetworkTopologyStrategy",
+ "local": 2,
+ "remote": 2,
+ },
+ }
+ strategy := getStrategy(keyspaceMeta, nopLoggerSingleton)
+ keyspaceMeta.placementStrategy = strategy
+ policyInternal.getSchemaMeta = func() *schemaMeta {
+ return &schemaMeta{
+ keyspaceMeta: map[string]*KeyspaceMetadata{
+ keyspace: keyspaceMeta,
},
- }, nil
+ }
}
- policyWithFallbackInternal.getKeyspaceMetadata =
policyInternal.getKeyspaceMetadata
- policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
- policyWithFallback.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace:
"myKeyspace"})
+ policyWithFallbackInternal.getSchemaMeta = policyInternal.getSchemaMeta
+
+ policy.SetPartitioner("OrderedPartitioner")
+ policyWithFallback.SetPartitioner("OrderedPartitioner")
// The NetworkTopologyStrategy above should generate the following
replicas.
// It's handy to have as reference here.
assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
- "myKeyspace": {
+ strategy.strategyKey(): {
{orderedToken("05"), []*HostInfo{hosts[0], hosts[1],
hosts[2], hosts[3]}},
{orderedToken("10"), []*HostInfo{hosts[1], hosts[2],
hosts[3], hosts[4]}},
{orderedToken("15"), []*HostInfo{hosts[2], hosts[3],
hosts[4], hosts[5]}},
@@ -809,3 +801,301 @@ func TestHostPolicy_TokenAware_RackAware(t *testing.T) {
expectHosts(t, "non-local DC", iter, "0", "1", "4", "5", "8", "9")
expectNoMoreHosts(t, iter)
}
+
+// TestHostPolicy_TokenAware_MultiKeyspace tests that token-aware routing works
+// for queries to keyspaces other than the session's default keyspace.
+func TestHostPolicy_TokenAware_MultiKeyspace(t *testing.T) {
+ const sessionKeyspace = "ks1"
+ const otherKeyspace = "ks2"
+
+ policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
+ policyInternal := policy.(*tokenAwareHostPolicy)
+ createKeyspaceMeta := func(name string) *KeyspaceMetadata {
+ ksMeta := &KeyspaceMetadata{
+ Name: name,
+ StrategyClass: "SimpleStrategy",
+ StrategyOptions: map[string]interface{}{
+ "class": "SimpleStrategy",
+ "replication_factor": 2,
+ },
+ }
+ ksMeta.placementStrategy = getStrategy(ksMeta,
nopLoggerSingleton)
+ return ksMeta
+ }
+
+ sessionKeyspaceMeta := createKeyspaceMeta(sessionKeyspace)
+ otherKeyspaceMeta := createKeyspaceMeta(otherKeyspace)
+ policyInternal.getSchemaMeta = func() *schemaMeta {
+ return &schemaMeta{
+ keyspaceMeta: map[string]*KeyspaceMetadata{
+ sessionKeyspace: sessionKeyspaceMeta,
+ otherKeyspace: otherKeyspaceMeta,
+ },
+ }
+ }
+
+ policy.SetPartitioner("OrderedPartitioner")
+
+ // Add hosts with tokens
+ hosts := [...]*HostInfo{
+ {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens:
[]string{"00"}},
+ {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens:
[]string{"25"}},
+ {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens:
[]string{"50"}},
+ {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens:
[]string{"75"}},
+ }
+ for _, host := range &hosts {
+ policy.AddHost(host)
+ }
+
+ // Verify both keyspaces are populated after SetPartitioner
+ meta := policyInternal.getMetadataReadOnly()
+ if meta.replicas[sessionKeyspaceMeta.placementStrategy.strategyKey()]
== nil {
+ t.Fatalf("session keyspace %s not in replica map",
sessionKeyspace)
+ }
+ if meta.replicas[otherKeyspaceMeta.placementStrategy.strategyKey()] ==
nil {
+ t.Fatalf("other keyspace %s not in replica map", otherKeyspace)
+ }
+
+ t.Run("SessionKeyspace", func(t *testing.T) {
+ query := &Query{}
+ query.getKeyspace = func() string { return sessionKeyspace }
+ query.RoutingKey([]byte("20"))
+
+ iter := policy.Pick(newInternalQuery(query, nil))
+
+ // Should get token-aware hosts (token "20" → host with token
"25")
+ expectHosts(t, "session keyspace token-aware", iter, "1", "2")
+ // Then fallback to remaining hosts
+ expectHosts(t, "session keyspace fallback", iter, "0", "3")
+ expectNoMoreHosts(t, iter)
+ })
+
+ t.Run("OtherKeyspace", func(t *testing.T) {
+ query := &Query{}
+ query.getKeyspace = func() string { return otherKeyspace }
+ query.RoutingKey([]byte("60"))
+
+ iter := policy.Pick(newInternalQuery(query, nil))
+
+ // Should get token-aware hosts for otherKeyspace
+ // token "60" → host with token "75"
+ expectHosts(t, "other keyspace token-aware", iter, "3", "0")
+ // Then fallback to remaining hosts
+ expectHosts(t, "other keyspace fallback", iter, "1", "2")
+ expectNoMoreHosts(t, iter)
+ })
+}
+
+// TestHostPolicy_TokenAware_MultiKeyspace_WithShuffleReplicas tests that
+// ShuffleReplicas option works correctly with proactively populated keyspaces.
+func TestHostPolicy_TokenAware_MultiKeyspace_WithShuffleReplicas(t *testing.T)
{
+ const sessionKeyspace = "ks1"
+ const otherKeyspace = "ks2"
+
+ policy := TokenAwareHostPolicy(RoundRobinHostPolicy(),
ShuffleReplicas())
+ policyInternal := policy.(*tokenAwareHostPolicy)
+
+ createKeyspaceMeta := func(name string) *KeyspaceMetadata {
+ ksMeta := &KeyspaceMetadata{
+ Name: name,
+ StrategyClass: "SimpleStrategy",
+ StrategyOptions: map[string]interface{}{
+ "class": "SimpleStrategy",
+ "replication_factor": 2,
+ },
+ }
+ ksMeta.placementStrategy = getStrategy(ksMeta,
nopLoggerSingleton)
+ return ksMeta
+ }
+
+ sessionKeyspaceMeta := createKeyspaceMeta(sessionKeyspace)
+ otherKeyspaceMeta := createKeyspaceMeta(otherKeyspace)
+ policyInternal.getSchemaMeta = func() *schemaMeta {
+ return &schemaMeta{
+ keyspaceMeta: map[string]*KeyspaceMetadata{
+ sessionKeyspace: sessionKeyspaceMeta,
+ otherKeyspace: otherKeyspaceMeta,
+ },
+ }
+ }
+
+ hosts := [...]*HostInfo{
+ {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens:
[]string{"00"}},
+ {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens:
[]string{"25"}},
+ {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens:
[]string{"50"}},
+ {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens:
[]string{"75"}},
+ }
+ for _, host := range &hosts {
+ policy.AddHost(host)
+ }
+ policy.SetPartitioner("OrderedPartitioner")
+
+ // Query other keyspace with shuffle replicas enabled
+ query := &Query{}
+ query.getKeyspace = func() string { return otherKeyspace }
+ query.RoutingKey([]byte("20"))
+
+ // Execute Pick multiple times and collect first hosts
+ firstHosts := make(map[string]int)
+ for i := 0; i < 100; i++ {
+ iter := policy.Pick(newInternalQuery(query, nil))
+ host := iter()
+ if host != nil {
+ firstHosts[host.Info().HostID()]++
+ }
+ }
+
+ // With ShuffleReplicas, we should see distribution across replicas
+ // (not always the same host)
+ if len(firstHosts) < 2 {
+ t.Errorf("expected distribution across replicas with
ShuffleReplicas, got only %d unique first hosts", len(firstHosts))
+ }
+}
+
+// TestHostPolicy_TokenAware_TopologyChangeUpdatesAllKeyspaces verifies that
+// when hosts are added or removed, replica maps are updated for ALL keyspaces,
+// not just the session keyspace.
+func TestHostPolicy_TokenAware_TopologyChangeUpdatesAllKeyspaces(t *testing.T)
{
+ const sessionKeyspace = "ks1"
+ const otherKeyspace = "ks2"
+
+ policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
+ policyInternal := policy.(*tokenAwareHostPolicy)
+
+ createKeyspaceMeta := func(name string) *KeyspaceMetadata {
+ ksMeta := &KeyspaceMetadata{
+ Name: name,
+ StrategyClass: "SimpleStrategy",
+ StrategyOptions: map[string]interface{}{
+ "class": "SimpleStrategy",
+ "replication_factor": 2,
+ },
+ }
+ ksMeta.placementStrategy = getStrategy(ksMeta,
nopLoggerSingleton)
+ return ksMeta
+ }
+
+ sessionKeyspaceMeta := createKeyspaceMeta(sessionKeyspace)
+ otherKeyspaceMeta := createKeyspaceMeta(otherKeyspace)
+ policyInternal.getSchemaMeta = func() *schemaMeta {
+ return &schemaMeta{
+ keyspaceMeta: map[string]*KeyspaceMetadata{
+ sessionKeyspace: sessionKeyspaceMeta,
+ otherKeyspace: otherKeyspaceMeta,
+ },
+ }
+ }
+
+ // Initial topology: 3 hosts
+ initialHosts := []*HostInfo{
+ {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens:
[]string{"00"}},
+ {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens:
[]string{"33"}},
+ {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens:
[]string{"66"}},
+ }
+ for _, host := range initialHosts {
+ policy.AddHost(host)
+ }
+ policy.SetPartitioner("OrderedPartitioner")
+
+ // Verify both keyspaces are in replica map
+ meta := policyInternal.getMetadataReadOnly()
+ if meta.replicas[sessionKeyspaceMeta.placementStrategy.strategyKey()]
== nil {
+ t.Fatalf("session keyspace %s not in replica map",
sessionKeyspace)
+ }
+ if meta.replicas[otherKeyspaceMeta.placementStrategy.strategyKey()] ==
nil {
+ t.Fatalf("other keyspace %s not in replica map", otherKeyspace)
+ }
+
+ // Test: Add a new host (topology change)
+ t.Run("AddHost", func(t *testing.T) {
+ newHost := &HostInfo{
+ hostId: "3",
+ connectAddress: net.IPv4(10, 0, 0, 4),
+ tokens: []string{"99"},
+ }
+ policy.AddHost(newHost)
+
+ // Verify: Get updated metadata
+ metaAfterAdd := policyInternal.getMetadataReadOnly()
+
+ // Check session keyspace was updated
+ updatedSessionReplicas :=
metaAfterAdd.replicas[sessionKeyspaceMeta.placementStrategy.strategyKey()]
+ if updatedSessionReplicas == nil {
+ t.Fatal("session keyspace replica map is nil after
AddHost")
+ }
+
+ // Check other keyspace was updated
+ updatedOtherReplicas :=
metaAfterAdd.replicas[otherKeyspaceMeta.placementStrategy.strategyKey()]
+ if updatedOtherReplicas == nil {
+ t.Fatal("other keyspace replica map is nil after
AddHost")
+ }
+
+ //Verify replica maps include new host
+ // For session keyspace
+ sessionHasNewHost := false
+ for _, ht := range updatedSessionReplicas {
+ for _, host := range ht.hosts {
+ if host.HostID() == "3" {
+ sessionHasNewHost = true
+ break
+ }
+ }
+ }
+ if !sessionHasNewHost {
+ t.Error("session keyspace replica map does not include
new host")
+ }
+
+ // For other keyspace
+ otherHasNewHost := false
+ for _, ht := range updatedOtherReplicas {
+ for _, host := range ht.hosts {
+ if host.HostID() == "3" {
+ otherHasNewHost = true
+ break
+ }
+ }
+ }
+ if !otherHasNewHost {
+ t.Error("other keyspace replica map does not include
new host - replica map is STALE after topology change!")
+ }
+ })
+
+ // Test: Remove host
+ t.Run("RemoveHost", func(t *testing.T) {
+ // Remove one of the original hosts
+ hostToRemove := initialHosts[0]
+ policy.RemoveHost(hostToRemove)
+
+ metaAfterRemove := policyInternal.getMetadataReadOnly()
+
+ // Verify session keyspace updated
+ sessionReplicasAfterRemove :=
metaAfterRemove.replicas[sessionKeyspaceMeta.placementStrategy.strategyKey()]
+ sessionStillHasHost := false
+ for _, ht := range sessionReplicasAfterRemove {
+ for _, host := range ht.hosts {
+ if host.HostID() == "0" {
+ sessionStillHasHost = true
+ break
+ }
+ }
+ }
+ if sessionStillHasHost {
+ t.Error("session keyspace still has removed host in
replica map")
+ }
+
+ // Verify other keyspace updated
+ otherReplicasAfterRemove :=
metaAfterRemove.replicas[otherKeyspaceMeta.placementStrategy.strategyKey()]
+ otherStillHasHost := false
+ for _, ht := range otherReplicasAfterRemove {
+ for _, host := range ht.hosts {
+ if host.HostID() == "0" {
+ otherStillHasHost = true
+ break
+ }
+ }
+ }
+ if otherStillHasHost {
+ t.Error("other keyspace still has removed host in
replica map - STALE after topology change!")
+ }
+ })
+}
diff --git a/session.go b/session.go
index 264ed892..58c46b32 100644
--- a/session.go
+++ b/session.go
@@ -79,8 +79,7 @@ type Session struct {
control *controlConn
// event handlers
- nodeEvents *eventDebouncer
- schemaEvents *eventDebouncer
+ nodeEvents *eventDebouncer
// ring metadata
useSystemSchema bool
@@ -162,10 +161,11 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
s.types = cfg.RegisteredTypes.Copy()
}
- s.schemaDescriber = newSchemaDescriber(s)
+ s.schemaDescriber = newSchemaDescriber(s,
newRefreshDebouncer(schemaRefreshDebounceTime, func() error {
+ return refreshSchemas(s)
+ }))
s.nodeEvents = newEventDebouncer("NodeEvents", s.handleNodeEvent,
s.logger)
- s.schemaEvents = newEventDebouncer("SchemaEvents", s.handleSchemaEvent,
s.logger)
s.routingMetadataCache.lru = lru.New(cfg.MaxRoutingKeyInfo)
@@ -367,8 +367,14 @@ func (s *Session) init() error {
// Invoke KeyspaceChanged to let the policy cache the session keyspace
// parameters. This is used by tokenAwareHostPolicy to discover
replicas.
- if !s.cfg.disableControlConn && s.cfg.Keyspace != "" {
- s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace:
s.cfg.Keyspace})
+ if !s.cfg.disableControlConn && s.schemaDescriber != nil {
+ err := s.schemaDescriber.refreshSchemaMetadata()
+ if err != nil {
+ s.logger.Warning("Failed to initialize schema metadata.
"+
+ "Token-aware routing will fall back to the
configured fallback policy. "+
+ "Attempts to retrieve keyspace metadata will
fail with ErrKeyspaceDoesNotExist until schema refresh succeeds.",
+ NewLogFieldError("err", err))
+ }
}
s.sessionStateMu.Lock()
@@ -522,6 +528,10 @@ func (s *Session) Close() {
s.pool.Close()
}
+ if s.schemaDescriber != nil {
+ s.schemaDescriber.schemaRefresher.stop()
+ }
+
if s.control != nil {
s.control.close()
}
@@ -530,10 +540,6 @@ func (s *Session) Close() {
s.nodeEvents.stop()
}
- if s.schemaEvents != nil {
- s.schemaEvents.stop()
- }
-
if s.ringRefresher != nil {
s.ringRefresher.stop()
}
diff --git a/topology.go b/topology.go
index e5741ff2..3ec20b15 100644
--- a/topology.go
+++ b/topology.go
@@ -68,6 +68,9 @@ func (h tokenRingReplicas) replicasFor(t token) *hostTokens {
type placementStrategy interface {
replicaMap(tokenRing *tokenRing) tokenRingReplicas
replicationFactor(dc string) int
+ // strategyKey returns a unique identifier string for this strategy
instance.
+ // Two strategy instances with identical configuration should return
the same key.
+ strategyKey() string
}
func getReplicationFactorFromOpts(val interface{}) (int, error) {
@@ -92,15 +95,15 @@ func getReplicationFactorFromOpts(val interface{}) (int,
error) {
func getStrategy(ks *KeyspaceMetadata, logger StructuredLogger)
placementStrategy {
switch {
- case strings.Contains(ks.StrategyClass, "SimpleStrategy"):
+ case strings.Contains(ks.StrategyClass, simpleStrategyClass):
rf, err :=
getReplicationFactorFromOpts(ks.StrategyOptions["replication_factor"])
if err != nil {
logger.Warning("Failed to parse replication factor of
keyspace configured with SimpleStrategy.",
NewLogFieldString("keyspace", ks.Name),
NewLogFieldError("err", err))
return nil
}
- return &simpleStrategy{rf: rf}
- case strings.Contains(ks.StrategyClass, "NetworkTopologyStrategy"):
+ return newSimpleStrategy(rf)
+ case strings.Contains(ks.StrategyClass, networkTopologyStrategyClass):
dcs := make(map[string]int)
for dc, rf := range ks.StrategyOptions {
if dc == "class" {
@@ -117,7 +120,7 @@ func getStrategy(ks *KeyspaceMetadata, logger
StructuredLogger) placementStrateg
dcs[dc] = rf
}
- return &networkTopology{dcs: dcs}
+ return newNetworkTopology(dcs)
case strings.Contains(ks.StrategyClass, "LocalStrategy"):
return nil
default:
@@ -127,8 +130,32 @@ func getStrategy(ks *KeyspaceMetadata, logger
StructuredLogger) placementStrateg
}
}
+const (
+ simpleStrategyClass string = "SimpleStrategy"
+ networkTopologyStrategyClass string = "NetworkTopologyStrategy"
+)
+
type simpleStrategy struct {
- rf int
+ rf int
+ key string
+}
+
+// newSimpleStrategy creates a new SimpleStrategy with pre-computed strategy
key.
+// Format: {Strategy}|rf:{RF}
+// Example: SimpleStrategy|rf:3
+func newSimpleStrategy(rf int) *simpleStrategy {
+ return &simpleStrategy{
+ rf: rf,
+ key: fmt.Sprintf("%s|rf:%d", simpleStrategyClass, rf),
+ }
+}
+
+func (s *simpleStrategy) strategyClass() string {
+ return simpleStrategyClass
+}
+
+func (s *simpleStrategy) strategyKey() string {
+ return s.key
}
func (s *simpleStrategy) replicationFactor(dc string) int {
@@ -161,6 +188,53 @@ func (s *simpleStrategy) replicaMap(tokenRing *tokenRing)
tokenRingReplicas {
type networkTopology struct {
dcs map[string]int
+ key string
+}
+
+// newNetworkTopology creates a new NetworkTopologyStrategy with pre-computed
strategy key.
+func newNetworkTopology(dcs map[string]int) *networkTopology {
+ return &networkTopology{
+ dcs: dcs,
+ key: buildNetworkTopologyKey(dcs),
+ }
+}
+
+func (n *networkTopology) strategyClass() string {
+ return networkTopologyStrategyClass
+}
+
+func (n *networkTopology) strategyKey() string {
+ return n.key
+}
+
+// buildNetworkTopologyKey creates a deterministic strategy key from
datacenter replication factors.
+// It sorts the datacenters alphabetically to ensure consistent key generation.
+//
+// Format: {Strategy}|{LenDC}:{DC}:{RF}|{LenDC}:{DC}:{RF}|...
+// Example: NetworkTopologyStrategy|3:dc1:3|3:dc2:1
+//
+// This format prevents collisions even if datacenter names contain special
characters,
+// by prepending the length of each datacenter name.
+func buildNetworkTopologyKey(dcs map[string]int) string {
+ dcNames := make([]string, 0, len(dcs))
+ for dc := range dcs {
+ dcNames = append(dcNames, dc)
+ }
+ sort.Strings(dcNames)
+
+ // Build key with length-prefixed dc names to prevent ambiguity
+ var b strings.Builder
+ b.Grow(64)
+ b.WriteString(networkTopologyStrategyClass)
+ for _, dc := range dcNames {
+ b.WriteString("|")
+ b.WriteString(strconv.Itoa(len(dc)))
+ b.WriteString(":")
+ b.WriteString(dc)
+ b.WriteString(":")
+ b.WriteString(strconv.Itoa(dcs[dc]))
+ }
+ return b.String()
}
func (n *networkTopology) replicationFactor(dc string) int {
diff --git a/topology_test.go b/topology_test.go
index 9a5cf1c1..10ee85e5 100644
--- a/topology_test.go
+++ b/topology_test.go
@@ -48,7 +48,7 @@ func TestPlacementStrategy_SimpleStrategy(t *testing.T) {
hosts := []*HostInfo{host0, host25, host50, host75}
- strat := &simpleStrategy{rf: 2}
+ strat := newSimpleStrategy(2)
tokenReplicas := strat.replicaMap(&tokenRing{hosts: hosts, tokens:
tokens})
if len(tokenReplicas) != len(tokens) {
t.Fatalf("expected replica map to have %d items but has %d",
len(tokens), len(tokenReplicas))
@@ -89,34 +89,28 @@ func TestPlacementStrategy_NetworkStrategy(t *testing.T) {
}{
{
name: "full",
- strat: &networkTopology{
- dcs: map[string]int{
- "dc1": 1,
- "dc2": 2,
- "dc3": 3,
- },
- },
+ strat: newNetworkTopology(map[string]int{
+ "dc1": 1,
+ "dc2": 2,
+ "dc3": 3,
+ }),
expectedReplicaMapSize: hostsPerDC * totalDCs,
},
{
name: "missing",
- strat: &networkTopology{
- dcs: map[string]int{
- "dc2": 2,
- "dc3": 3,
- },
- },
+ strat: newNetworkTopology(map[string]int{
+ "dc2": 2,
+ "dc3": 3,
+ }),
expectedReplicaMapSize: hostsPerDC * 2,
},
{
name: "zero",
- strat: &networkTopology{
- dcs: map[string]int{
- "dc1": 0,
- "dc2": 2,
- "dc3": 3,
- },
- },
+ strat: newNetworkTopology(map[string]int{
+ "dc1": 0,
+ "dc2": 2,
+ "dc3": 3,
+ }),
expectedReplicaMapSize: hostsPerDC * 2,
},
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]