This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cep-45-mutation-tracking in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by this push: new 6376bd3f6d Fix mutation tracking startup 6376bd3f6d is described below commit 6376bd3f6d3a679e5b2297ed7d48dea667c56f07 Author: Blake Eggleston <bl...@ultrablake.com> AuthorDate: Tue Apr 8 14:54:08 2025 -0700 Fix mutation tracking startup Patch by Blake Eggleston; Reviewed by Abe Ratnofsky for CASSANDRA-20540 --- CHANGES.txt | 3 +++ src/java/org/apache/cassandra/replication/CoordinatorLog.java | 7 +++++++ .../apache/cassandra/replication/MutationTrackingService.java | 9 ++++++++- .../org/apache/cassandra/replication/ReconciliationPlan.java | 7 +++++++ .../cassandra/service/reads/tracked/ReadReconcileNotify.java | 2 +- .../cassandra/service/reads/tracked/ReadReconcileReceive.java | 2 +- .../cassandra/service/reads/tracked/ReadReconcileSend.java | 2 +- .../cassandra/service/reads/tracked/ReadReconciliations.java | 3 ++- .../service/reads/tracked/TrackedReadReconciliation.java | 3 ++- .../cassandra/service/reads/tracked/TrackedResolver.java | 10 ++++++++++ src/java/org/apache/cassandra/tcm/Startup.java | 6 ++++-- 11 files changed, 46 insertions(+), 8 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 09fc56d681..fb9f8fbeed 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,8 @@ cep-45-mutation-tracking + * Fix mutation tracking startup (CASSANDRA-20540) * Mutation tracking journal integration, read, and write path (CASSANDRA-20304, CASSANDRA-20305, CASSANDRA-20308) + * Introduce MutationJournal for coordinator logs (CASSANDRA-20353) + * Copy over Journal and dependencies from cep-15-accord (CASSANDRA-20321) 5.1 * Throw new IndexBuildInProgressException when queries fail during index build, instead of IndexNotAvailableException (CASSANDRA-20402) diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index 62c27dc0fd..d1b8dda1db 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -26,11 +26,15 @@ import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; import org.apache.cassandra.schema.TableId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; public abstract class CoordinatorLog { + private static final Logger logger = LoggerFactory.getLogger(CoordinatorLog.class); + protected final int localHostId; protected final CoordinatorLogId logId; protected final Participants participants; @@ -68,6 +72,7 @@ public abstract class CoordinatorLog void witnessedRemoteMutation(MutationId mutationId, int onHostId) { + logger.trace("witnessed remote mutation {} from {}", mutationId, onHostId); lock.writeLock().lock(); try { @@ -88,6 +93,7 @@ public abstract class CoordinatorLog if (allOtherReplicasWitnessed) { + logger.trace("marking mutation {} as fully reconciled", mutationId); // if all replicas have now witnessed the id, remove it from the index unreconciledMutations.remove(mutationId.offset()); reconciledIds.add(mutationId.offset()); @@ -117,6 +123,7 @@ public abstract class CoordinatorLog void finishWriting(Mutation mutation) { + logger.trace("witnessed local mutation {}", mutation.id()); lock.writeLock().lock(); try { diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index bda2ec0fff..310dc4dac6 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -37,11 +37,14 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.reads.tracked.ReadReconciliations; import org.apache.cassandra.tcm.ClusterMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; // TODO (expected): persistence (handle restarts) // TODO (expected): handle topology changes public class MutationTrackingService { + private static final Logger logger = LoggerFactory.getLogger(MutationTrackingService.class); public static final MutationTrackingService instance = new MutationTrackingService(); private final ReadReconciliations reconciliations = new ReadReconciliations(); @@ -57,6 +60,8 @@ public class MutationTrackingService if (started) return; + logger.info("Starting replication tracking service"); + for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces()) if (keyspace.useMutationTracking()) shards.put(keyspace.name, KeyspaceShards.make(keyspace, metadata, this::nextHostLogId)); @@ -80,7 +85,9 @@ public class MutationTrackingService public MutationId nextMutationId(String keyspace, Token token) { - return getOrCreate(keyspace).nextMutationId(token); + MutationId id = getOrCreate(keyspace).nextMutationId(token); + logger.trace("Created new mutation id {}", id); + return id; } public void witnessedRemoteMutation(String keyspace, Token token, MutationId mutationId, InetAddressAndPort onHost) diff --git a/src/java/org/apache/cassandra/replication/ReconciliationPlan.java b/src/java/org/apache/cassandra/replication/ReconciliationPlan.java index 2347342552..cf0946bce8 100644 --- a/src/java/org/apache/cassandra/replication/ReconciliationPlan.java +++ b/src/java/org/apache/cassandra/replication/ReconciliationPlan.java @@ -45,6 +45,13 @@ public class ReconciliationPlan this.coordinatorIds = coordinatorIds; } + @Override + public String toString() { + return "PeerReconciliation{" + + "coordinatorIds=" + coordinatorIds + + '}'; + } + public Set<ShortMutationId> ids() { int size = 0; diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java index 83808e4e73..e5c63c342c 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java @@ -63,7 +63,7 @@ public class ReadReconcileNotify public void doVerb(Message<ReadReconcileNotify> message) throws IOException { ReadReconcileNotify notify = message.payload; - logger.trace("Received read reconcile notify {} from {}", notify, message.from()); + logger.trace("Received read reconcile notify from {}: {}", message.from(), notify); MutationTrackingService.instance.reconciliations().acknowledgeSync(notify.reconciliationId, notify.syncId); } }; diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileReceive.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileReceive.java index fb86f68a40..05fa52aa0d 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileReceive.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileReceive.java @@ -182,7 +182,7 @@ public class ReadReconcileReceive { // TODO: check epoch and tokens? ReadReconcileReceive receive = message.payload; - logger.trace("Received read reconciliation {} from", receive, message.from()); + logger.trace("Received read reconciliation from {}: {}", message.from(), receive); if (receive.kind.writeLocally()) { receive.mutations.forEach(Mutation::apply); diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java index 217f12416c..91066339ee 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java @@ -129,7 +129,7 @@ public class ReadReconcileSend @Override public void doVerb(Message<ReadReconcileSend> message) { - logger.trace("Received {} from {}", message.payload, message.from()); + logger.trace("Received ReadReconcileSend from {}: {}", message.from(), message.payload); // TODO: check epoch and tokens? ReadReconcileSend payload = message.payload; for (PeerSync sync : message.payload.syncTasks) diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java index 075011d57b..8ba7be954a 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java @@ -102,7 +102,8 @@ public class ReadReconciliations implements Shutdownable n++; } } - logger.trace("Expired {} entries", n); + if (n > 0) + logger.trace("Expired {} entries", n); } public ReadReconciliations() diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedReadReconciliation.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedReadReconciliation.java index f9f4b4adfb..5accd6ebb3 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedReadReconciliation.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedReadReconciliation.java @@ -378,6 +378,7 @@ public class TrackedReadReconciliation<E extends Endpoints<E>, P extends Replica Data data = new Data(0, command, dataNode, dataResponse, Collections.emptySet(), resultConsumer); Preconditions.checkState(data.isComplete()); state = new State.Complete(data); + logger.trace("Mutation summaries matched: {}", replicaPlan); return; } @@ -386,7 +387,7 @@ public class TrackedReadReconciliation<E extends Endpoints<E>, P extends Replica long expiresAt = requestTime.computeDeadline(command.getTimeout(TimeUnit.NANOSECONDS)); long reconciliationId = MutationTrackingService.instance.reconciliations().newReconciliation(this, expiresAt); - logger.trace("New reconciliation {} with timeout {}", reconciliationId, command.timeoutNanos()); + logger.trace("Starting new reconciliation {}", reconciliationId); state = new State.Pending(reconciliationId, plans, command, dataNode, dataResponse, resultConsumer); state.asPending().sendSyncMessages(); diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedResolver.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedResolver.java index 8b39d679d1..d6351f8ec7 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedResolver.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedResolver.java @@ -32,9 +32,12 @@ import org.apache.cassandra.net.Message; import org.apache.cassandra.service.reads.IReadResponse; import org.apache.cassandra.service.reads.ResponseResolver; import org.apache.cassandra.transport.Dispatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TrackedResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E, P>> extends ResponseResolver<E, P> { + private static final Logger logger = LoggerFactory.getLogger(TrackedResolver.class); private volatile Message<IReadResponse> dataResponse; public TrackedResolver(ReadCommand command, Supplier<? extends P> replicaPlan, Dispatcher.RequestTime requestTime) @@ -52,6 +55,12 @@ public class TrackedResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRe @Override public void onResponseReceived(Message<IReadResponse> message) { + if (logger.isTraceEnabled()) + { + TrackedReadResponse response = TrackedReadResponse.fromResponse(message.payload); + logger.trace("Received response summary from {}: {}", message.from(), response.summary); + } + if (dataResponse == null && message.payload instanceof TrackedReadResponse.Data) dataResponse = message; } @@ -83,6 +92,7 @@ public class TrackedResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRe first = false; } + logger.trace("All mutation summaries match"); return true; } diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 4509283023..ac7dedf8b5 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -152,7 +152,8 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; LocalLog.LogSpec logSpec = LocalLog.logSpec() .withStorage(LogStorage.SystemKeyspace) .afterReplay(Startup::scrubDataDirectories, - (metadata) -> StorageService.instance.registerMBeans()) + (metadata) -> StorageService.instance.registerMBeans(), + MutationTrackingService.instance::start) .withDefaultListeners(); ClusterMetadataService.setInstance(new ClusterMetadataService(new UniformRangePlacement(), wrapProcessor, @@ -265,7 +266,8 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; LocalLog.LogSpec logSpec = LocalLog.logSpec() .withInitialState(emptyFromSystemTables) .afterReplay(Startup::scrubDataDirectories, - (metadata) -> StorageService.instance.registerMBeans()) + (metadata) -> StorageService.instance.registerMBeans(), + MutationTrackingService.instance::start) .withStorage(LogStorage.SystemKeyspace) .withDefaultListeners(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org