This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new 6376bd3f6d Fix mutation tracking startup
6376bd3f6d is described below

commit 6376bd3f6d3a679e5b2297ed7d48dea667c56f07
Author: Blake Eggleston <bl...@ultrablake.com>
AuthorDate: Tue Apr 8 14:54:08 2025 -0700

    Fix mutation tracking startup
    
    Patch by Blake Eggleston; Reviewed by Abe Ratnofsky for CASSANDRA-20540
---
 CHANGES.txt                                                    |  3 +++
 src/java/org/apache/cassandra/replication/CoordinatorLog.java  |  7 +++++++
 .../apache/cassandra/replication/MutationTrackingService.java  |  9 ++++++++-
 .../org/apache/cassandra/replication/ReconciliationPlan.java   |  7 +++++++
 .../cassandra/service/reads/tracked/ReadReconcileNotify.java   |  2 +-
 .../cassandra/service/reads/tracked/ReadReconcileReceive.java  |  2 +-
 .../cassandra/service/reads/tracked/ReadReconcileSend.java     |  2 +-
 .../cassandra/service/reads/tracked/ReadReconciliations.java   |  3 ++-
 .../service/reads/tracked/TrackedReadReconciliation.java       |  3 ++-
 .../cassandra/service/reads/tracked/TrackedResolver.java       | 10 ++++++++++
 src/java/org/apache/cassandra/tcm/Startup.java                 |  6 ++++--
 11 files changed, 46 insertions(+), 8 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 09fc56d681..fb9f8fbeed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,8 @@
 cep-45-mutation-tracking
+ * Fix mutation tracking startup (CASSANDRA-20540)
  * Mutation tracking journal integration, read, and write path 
(CASSANDRA-20304, CASSANDRA-20305, CASSANDRA-20308)
+ * Introduce MutationJournal for coordinator logs (CASSANDRA-20353)
+ * Copy over Journal and dependencies from cep-15-accord (CASSANDRA-20321)
 
 5.1
  * Throw new IndexBuildInProgressException when queries fail during index 
build, instead of IndexNotAvailableException (CASSANDRA-20402)
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index 62c27dc0fd..d1b8dda1db 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -26,11 +26,15 @@ import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.schema.TableId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 
 public abstract class CoordinatorLog
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(CoordinatorLog.class);
+
     protected final int localHostId;
     protected final CoordinatorLogId logId;
     protected final Participants participants;
@@ -68,6 +72,7 @@ public abstract class CoordinatorLog
 
     void witnessedRemoteMutation(MutationId mutationId, int onHostId)
     {
+        logger.trace("witnessed remote mutation {} from {}", mutationId, 
onHostId);
         lock.writeLock().lock();
         try
         {
@@ -88,6 +93,7 @@ public abstract class CoordinatorLog
 
             if (allOtherReplicasWitnessed)
             {
+                logger.trace("marking mutation {} as fully reconciled", 
mutationId);
                 // if all replicas have now witnessed the id, remove it from 
the index
                 unreconciledMutations.remove(mutationId.offset());
                 reconciledIds.add(mutationId.offset());
@@ -117,6 +123,7 @@ public abstract class CoordinatorLog
 
     void finishWriting(Mutation mutation)
     {
+        logger.trace("witnessed local mutation {}", mutation.id());
         lock.writeLock().lock();
         try
         {
diff --git 
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java 
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index bda2ec0fff..310dc4dac6 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -37,11 +37,14 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.reads.tracked.ReadReconciliations;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 // TODO (expected): persistence (handle restarts)
 // TODO (expected): handle topology changes
 public class MutationTrackingService
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(MutationTrackingService.class);
     public static final MutationTrackingService instance = new 
MutationTrackingService();
 
     private final ReadReconciliations reconciliations = new 
ReadReconciliations();
@@ -57,6 +60,8 @@ public class MutationTrackingService
         if (started)
             return;
 
+        logger.info("Starting replication tracking service");
+
         for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
             if (keyspace.useMutationTracking())
                 shards.put(keyspace.name, KeyspaceShards.make(keyspace, 
metadata, this::nextHostLogId));
@@ -80,7 +85,9 @@ public class MutationTrackingService
 
     public MutationId nextMutationId(String keyspace, Token token)
     {
-        return getOrCreate(keyspace).nextMutationId(token);
+        MutationId id = getOrCreate(keyspace).nextMutationId(token);
+        logger.trace("Created new mutation id {}", id);
+        return id;
     }
 
     public void witnessedRemoteMutation(String keyspace, Token token, 
MutationId mutationId, InetAddressAndPort onHost)
diff --git a/src/java/org/apache/cassandra/replication/ReconciliationPlan.java 
b/src/java/org/apache/cassandra/replication/ReconciliationPlan.java
index 2347342552..cf0946bce8 100644
--- a/src/java/org/apache/cassandra/replication/ReconciliationPlan.java
+++ b/src/java/org/apache/cassandra/replication/ReconciliationPlan.java
@@ -45,6 +45,13 @@ public class ReconciliationPlan
             this.coordinatorIds = coordinatorIds;
         }
 
+        @Override
+        public String toString() {
+            return "PeerReconciliation{" +
+                    "coordinatorIds=" + coordinatorIds +
+                    '}';
+        }
+
         public Set<ShortMutationId> ids()
         {
             int size = 0;
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java
index 83808e4e73..e5c63c342c 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java
@@ -63,7 +63,7 @@ public class ReadReconcileNotify
         public void doVerb(Message<ReadReconcileNotify> message) throws 
IOException
         {
             ReadReconcileNotify notify = message.payload;
-            logger.trace("Received read reconcile notify {} from {}", notify, 
message.from());
+            logger.trace("Received read reconcile notify from {}: {}", 
message.from(), notify);
             
MutationTrackingService.instance.reconciliations().acknowledgeSync(notify.reconciliationId,
 notify.syncId);
         }
     };
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileReceive.java 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileReceive.java
index fb86f68a40..05fa52aa0d 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileReceive.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileReceive.java
@@ -182,7 +182,7 @@ public class ReadReconcileReceive
         {
             // TODO: check epoch and tokens?
             ReadReconcileReceive receive = message.payload;
-            logger.trace("Received read reconciliation {} from", receive, 
message.from());
+            logger.trace("Received read reconciliation from {}: {}", 
message.from(), receive);
             if (receive.kind.writeLocally())
             {
                 receive.mutations.forEach(Mutation::apply);
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java
index 217f12416c..91066339ee 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java
@@ -129,7 +129,7 @@ public class ReadReconcileSend
         @Override
         public void doVerb(Message<ReadReconcileSend> message)
         {
-            logger.trace("Received {} from {}", message.payload, 
message.from());
+            logger.trace("Received ReadReconcileSend from {}: {}", 
message.from(), message.payload);
             // TODO: check epoch and tokens?
             ReadReconcileSend payload = message.payload;
             for (PeerSync sync : message.payload.syncTasks)
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
index 075011d57b..8ba7be954a 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
@@ -102,7 +102,8 @@ public class ReadReconciliations implements Shutdownable
                     n++;
             }
         }
-        logger.trace("Expired {} entries", n);
+        if (n > 0)
+            logger.trace("Expired {} entries", n);
     }
 
     public ReadReconciliations()
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedReadReconciliation.java
 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedReadReconciliation.java
index f9f4b4adfb..5accd6ebb3 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedReadReconciliation.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedReadReconciliation.java
@@ -378,6 +378,7 @@ public class TrackedReadReconciliation<E extends 
Endpoints<E>, P extends Replica
             Data data = new Data(0, command, dataNode, dataResponse, 
Collections.emptySet(), resultConsumer);
             Preconditions.checkState(data.isComplete());
             state = new State.Complete(data);
+            logger.trace("Mutation summaries matched: {}", replicaPlan);
             return;
         }
 
@@ -386,7 +387,7 @@ public class TrackedReadReconciliation<E extends 
Endpoints<E>, P extends Replica
 
         long expiresAt = 
requestTime.computeDeadline(command.getTimeout(TimeUnit.NANOSECONDS));
         long reconciliationId = 
MutationTrackingService.instance.reconciliations().newReconciliation(this, 
expiresAt);
-        logger.trace("New reconciliation {} with timeout {}", 
reconciliationId, command.timeoutNanos());
+        logger.trace("Starting new reconciliation {}", reconciliationId);
 
         state = new State.Pending(reconciliationId, plans, command, dataNode, 
dataResponse, resultConsumer);
         state.asPending().sendSyncMessages();
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedResolver.java 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedResolver.java
index 8b39d679d1..d6351f8ec7 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedResolver.java
@@ -32,9 +32,12 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.reads.IReadResponse;
 import org.apache.cassandra.service.reads.ResponseResolver;
 import org.apache.cassandra.transport.Dispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TrackedResolver<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E, P>> extends ResponseResolver<E, P>
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(TrackedResolver.class);
     private volatile Message<IReadResponse> dataResponse;
 
     public TrackedResolver(ReadCommand command, Supplier<? extends P> 
replicaPlan, Dispatcher.RequestTime requestTime)
@@ -52,6 +55,12 @@ public class TrackedResolver<E extends Endpoints<E>, P 
extends ReplicaPlan.ForRe
     @Override
     public void onResponseReceived(Message<IReadResponse> message)
     {
+        if (logger.isTraceEnabled())
+        {
+            TrackedReadResponse response = 
TrackedReadResponse.fromResponse(message.payload);
+            logger.trace("Received response summary from {}: {}", 
message.from(), response.summary);
+        }
+
         if (dataResponse == null && message.payload instanceof 
TrackedReadResponse.Data)
             dataResponse = message;
     }
@@ -83,6 +92,7 @@ public class TrackedResolver<E extends Endpoints<E>, P 
extends ReplicaPlan.ForRe
 
             first = false;
         }
+        logger.trace("All mutation summaries match");
         return true;
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index 4509283023..ac7dedf8b5 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -152,7 +152,8 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         LocalLog.LogSpec logSpec = LocalLog.logSpec()
                                            
.withStorage(LogStorage.SystemKeyspace)
                                            
.afterReplay(Startup::scrubDataDirectories,
-                                                        (metadata) -> 
StorageService.instance.registerMBeans())
+                                                        (metadata) -> 
StorageService.instance.registerMBeans(),
+                                                        
MutationTrackingService.instance::start)
                                            .withDefaultListeners();
         ClusterMetadataService.setInstance(new ClusterMetadataService(new 
UniformRangePlacement(),
                                                                       
wrapProcessor,
@@ -265,7 +266,8 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         LocalLog.LogSpec logSpec = LocalLog.logSpec()
                                            
.withInitialState(emptyFromSystemTables)
                                            
.afterReplay(Startup::scrubDataDirectories,
-                                                        (metadata) -> 
StorageService.instance.registerMBeans())
+                                                        (metadata) -> 
StorageService.instance.registerMBeans(),
+                                                        
MutationTrackingService.instance::start)
                                            
.withStorage(LogStorage.SystemKeyspace)
                                            .withDefaultListeners();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to