This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cep-45-mutation-tracking in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 6baab9889bb6141c0894ffbc90177b8f09be4cd7 Author: Blake Eggleston <bl...@ultrablake.com> AuthorDate: Tue Apr 8 14:54:08 2025 -0700 Fix mutation tracking startup Patch by Blake Eggleston; Reviewed by Abe Ratnofsky for CASSANDRA-20540 --- CHANGES.txt | 3 +++ src/java/org/apache/cassandra/replication/CoordinatorLog.java | 7 +++++++ .../apache/cassandra/replication/MutationTrackingService.java | 9 ++++++++- .../cassandra/service/reads/tracked/ReadReconcileSend.java | 2 +- src/java/org/apache/cassandra/tcm/Startup.java | 6 ++++-- 5 files changed, 23 insertions(+), 4 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 09fc56d681..fb9f8fbeed 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,8 @@ cep-45-mutation-tracking + * Fix mutation tracking startup (CASSANDRA-20540) * Mutation tracking journal integration, read, and write path (CASSANDRA-20304, CASSANDRA-20305, CASSANDRA-20308) + * Introduce MutationJournal for coordinator logs (CASSANDRA-20353) + * Copy over Journal and dependencies from cep-15-accord (CASSANDRA-20321) 5.1 * Throw new IndexBuildInProgressException when queries fail during index build, instead of IndexNotAvailableException (CASSANDRA-20402) diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index dbd92ce7c3..0abd0db93e 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -26,11 +26,15 @@ import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; import org.apache.cassandra.schema.TableId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; public abstract class CoordinatorLog { + private static final Logger logger = LoggerFactory.getLogger(CoordinatorLog.class); + protected final int localHostId; protected final CoordinatorLogId logId; protected final Participants participants; @@ -68,6 +72,7 @@ public abstract class CoordinatorLog void witnessedRemoteMutation(MutationId mutationId, int onHostId) { + logger.trace("witnessed remote mutation {} from {}", mutationId, onHostId); lock.writeLock().lock(); try { @@ -88,6 +93,7 @@ public abstract class CoordinatorLog if (allOtherReplicasWitnessed) { + logger.trace("marking mutation {} as fully reconciled", mutationId); // if all replicas have now witnessed the id, remove it from the index unreconciledMutations.remove(mutationId.offset()); reconciledIds.add(mutationId.offset()); @@ -117,6 +123,7 @@ public abstract class CoordinatorLog void finishWriting(Mutation mutation) { + logger.trace("witnessed local mutation {}", mutation.id()); lock.writeLock().lock(); try { diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 3755863464..0c48c3e498 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -37,11 +37,14 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.reads.tracked.TrackedLocalReads; import org.apache.cassandra.tcm.ClusterMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; // TODO (expected): persistence (handle restarts) // TODO (expected): handle topology changes public class MutationTrackingService { + private static final Logger logger = LoggerFactory.getLogger(MutationTrackingService.class); public static final MutationTrackingService instance = new MutationTrackingService(); private final TrackedLocalReads localReads = new TrackedLocalReads(); @@ -57,6 +60,8 @@ public class MutationTrackingService if (started) return; + logger.info("Starting replication tracking service"); + for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces()) if (keyspace.useMutationTracking()) shards.put(keyspace.name, KeyspaceShards.make(keyspace, metadata, this::nextHostLogId)); @@ -80,7 +85,9 @@ public class MutationTrackingService public MutationId nextMutationId(String keyspace, Token token) { - return getOrCreate(keyspace).nextMutationId(token); + MutationId id = getOrCreate(keyspace).nextMutationId(token); + logger.trace("Created new mutation id {}", id); + return id; } public void witnessedRemoteMutation(String keyspace, Token token, MutationId mutationId, InetAddressAndPort onHost) diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java index d42a9d37f8..e51ea282b0 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileSend.java @@ -121,7 +121,7 @@ public class ReadReconcileSend @Override public void doVerb(Message<ReadReconcileSend> message) { - logger.trace("Received {} from {}", message.payload, message.from()); + logger.trace("Received ReadReconcileSend from {}: {}", message.from(), message.payload); // TODO: check epoch and tokens? ReadReconcileSend payload = message.payload; for (PeerSync sync : message.payload.syncTasks) diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 4509283023..ac7dedf8b5 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -152,7 +152,8 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; LocalLog.LogSpec logSpec = LocalLog.logSpec() .withStorage(LogStorage.SystemKeyspace) .afterReplay(Startup::scrubDataDirectories, - (metadata) -> StorageService.instance.registerMBeans()) + (metadata) -> StorageService.instance.registerMBeans(), + MutationTrackingService.instance::start) .withDefaultListeners(); ClusterMetadataService.setInstance(new ClusterMetadataService(new UniformRangePlacement(), wrapProcessor, @@ -265,7 +266,8 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; LocalLog.LogSpec logSpec = LocalLog.logSpec() .withInitialState(emptyFromSystemTables) .afterReplay(Startup::scrubDataDirectories, - (metadata) -> StorageService.instance.registerMBeans()) + (metadata) -> StorageService.instance.registerMBeans(), + MutationTrackingService.instance::start) .withStorage(LogStorage.SystemKeyspace) .withDefaultListeners(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org