This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 6baab9889bb6141c0894ffbc90177b8f09be4cd7
Author: Blake Eggleston <bl...@ultrablake.com>
AuthorDate: Tue Apr 8 14:54:08 2025 -0700

    Fix mutation tracking startup
    
    Patch by Blake Eggleston; Reviewed by Abe Ratnofsky for CASSANDRA-20540
---
 CHANGES.txt                                                      | 3 +++
 src/java/org/apache/cassandra/replication/CoordinatorLog.java    | 7 +++++++
 .../apache/cassandra/replication/MutationTrackingService.java    | 9 ++++++++-
 .../cassandra/service/reads/tracked/ReadReconcileSend.java       | 2 +-
 src/java/org/apache/cassandra/tcm/Startup.java                   | 6 ++++--
 5 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 09fc56d681..fb9f8fbeed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,8 @@
 cep-45-mutation-tracking
+ * Fix mutation tracking startup (CASSANDRA-20540)
  * Mutation tracking journal integration, read, and write path 
(CASSANDRA-20304, CASSANDRA-20305, CASSANDRA-20308)
+ * Introduce MutationJournal for coordinator logs (CASSANDRA-20353)
+ * Copy over Journal and dependencies from cep-15-accord (CASSANDRA-20321)
 
 5.1
  * Throw new IndexBuildInProgressException when queries fail during index 
build, instead of IndexNotAvailableException (CASSANDRA-20402)
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index dbd92ce7c3..0abd0db93e 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -26,11 +26,15 @@ import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.schema.TableId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 
 public abstract class CoordinatorLog
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(CoordinatorLog.class);
+
     protected final int localHostId;
     protected final CoordinatorLogId logId;
     protected final Participants participants;
@@ -68,6 +72,7 @@ public abstract class CoordinatorLog
 
     void witnessedRemoteMutation(MutationId mutationId, int onHostId)
     {
+        logger.trace("witnessed remote mutation {} from {}", mutationId, 
onHostId);
         lock.writeLock().lock();
         try
         {
@@ -88,6 +93,7 @@ public abstract class CoordinatorLog
 
             if (allOtherReplicasWitnessed)
             {
+                logger.trace("marking mutation {} as fully reconciled", 
mutationId);
                 // if all replicas have now witnessed the id, remove it from 
the index
                 unreconciledMutations.remove(mutationId.offset());
                 reconciledIds.add(mutationId.offset());
@@ -117,6 +123,7 @@ public abstract class CoordinatorLog
 
     void finishWriting(Mutation mutation)
     {
+        logger.trace("witnessed local mutation {}", mutation.id());
         lock.writeLock().lock();
         try
         {
diff --git 
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java 
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 3755863464..0c48c3e498 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -37,11 +37,14 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.reads.tracked.TrackedLocalReads;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 // TODO (expected): persistence (handle restarts)
 // TODO (expected): handle topology changes
 public class MutationTrackingService
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(MutationTrackingService.class);
     public static final MutationTrackingService instance = new 
MutationTrackingService();
 
     private final TrackedLocalReads localReads = new TrackedLocalReads();
@@ -57,6 +60,8 @@ public class MutationTrackingService
         if (started)
             return;
 
+        logger.info("Starting replication tracking service");
+
         for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
             if (keyspace.useMutationTracking())
                 shards.put(keyspace.name, KeyspaceShards.make(keyspace, 
metadata, this::nextHostLogId));
@@ -80,7 +85,9 @@ public class MutationTrackingService
 
     public MutationId nextMutationId(String keyspace, Token token)
     {
-        return getOrCreate(keyspace).nextMutationId(token);
+        MutationId id = getOrCreate(keyspace).nextMutationId(token);
+        logger.trace("Created new mutation id {}", id);
+        return id;
     }
 
     public void witnessedRemoteMutation(String keyspace, Token token, 
MutationId mutationId, InetAddressAndPort onHost)
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java
index d42a9d37f8..e51ea282b0 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java
@@ -121,7 +121,7 @@ public class ReadReconcileSend
         @Override
         public void doVerb(Message<ReadReconcileSend> message)
         {
-            logger.trace("Received {} from {}", message.payload, 
message.from());
+            logger.trace("Received ReadReconcileSend from {}: {}", 
message.from(), message.payload);
             // TODO: check epoch and tokens?
             ReadReconcileSend payload = message.payload;
             for (PeerSync sync : message.payload.syncTasks)
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index 4509283023..ac7dedf8b5 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -152,7 +152,8 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         LocalLog.LogSpec logSpec = LocalLog.logSpec()
                                            
.withStorage(LogStorage.SystemKeyspace)
                                            
.afterReplay(Startup::scrubDataDirectories,
-                                                        (metadata) -> 
StorageService.instance.registerMBeans())
+                                                        (metadata) -> 
StorageService.instance.registerMBeans(),
+                                                        
MutationTrackingService.instance::start)
                                            .withDefaultListeners();
         ClusterMetadataService.setInstance(new ClusterMetadataService(new 
UniformRangePlacement(),
                                                                       
wrapProcessor,
@@ -265,7 +266,8 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         LocalLog.LogSpec logSpec = LocalLog.logSpec()
                                            
.withInitialState(emptyFromSystemTables)
                                            
.afterReplay(Startup::scrubDataDirectories,
-                                                        (metadata) -> 
StorageService.instance.registerMBeans())
+                                                        (metadata) -> 
StorageService.instance.registerMBeans(),
+                                                        
MutationTrackingService.instance::start)
                                            
.withStorage(LogStorage.SystemKeyspace)
                                            .withDefaultListeners();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to