This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cep-45-mutation-tracking in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit b9ac1602d9d8d55dabbefe044560d7564cf14952 Author: Aleksey Yeschenko <alek...@apache.org> AuthorDate: Fri Apr 4 12:38:28 2025 +0100 Implement coordinator log offset broadcasting patch by Aleksey Yeschenko; reviewed by Blake Eggleston for CASSANDRA-20576 --- src/java/org/apache/cassandra/net/Verb.java | 2 + .../cassandra/replication/CoordinatorLog.java | 157 ++++++++++++++------- .../cassandra/replication/CoordinatorLogId.java | 17 ++- .../cassandra/replication/ForwardedWrite.java | 2 +- .../cassandra/replication/MutationSummary.java | 8 ++ .../replication/MutationTrackingService.java | 129 +++++++++++++++-- .../org/apache/cassandra/replication/Offsets.java | 21 +-- .../apache/cassandra/replication/Participants.java | 5 + .../org/apache/cassandra/replication/Shard.java | 59 +++++++- .../replication/ShardReplicatedOffsets.java | 108 ++++++++++++++ .../cassandra/replication/TrackedWriteRequest.java | 12 +- ...articipants.java => UnreconciledMutations.java} | 44 ++---- ...ates.java => UnreconciledMutationsReplica.java} | 17 ++- .../service/TrackedWriteResponseHandler.java | 2 +- .../service/reads/tracked/ReadReconcileNotify.java | 2 +- .../test/tracking/OffsetBroadcastTest.java | 79 +++++++++++ .../cassandra/replication/CoordinatorLogTest.java | 6 +- .../apache/cassandra/replication/OffsetsTest.java | 10 +- 18 files changed, 541 insertions(+), 139 deletions(-) diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index b96978ae7e..af287d6305 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -73,6 +73,7 @@ import org.apache.cassandra.repair.messages.SyncRequest; import org.apache.cassandra.repair.messages.ValidationResponse; import org.apache.cassandra.repair.messages.ValidationRequest; import org.apache.cassandra.replication.ForwardedWrite; +import org.apache.cassandra.replication.ShardReplicatedOffsets; import org.apache.cassandra.schema.SchemaMutationsSerializer; import org.apache.cassandra.schema.SchemaPullVerbHandler; import org.apache.cassandra.schema.SchemaPushVerbHandler; @@ -249,6 +250,7 @@ public enum Verb READ_RECONCILE_RCV (902, P0, rpcTimeout, MUTATION, () -> ReadReconcileReceive.serializer, () -> ReadReconcileReceive.verbHandler ), READ_RECONCILE_NOTIFY (903, P0, rpcTimeout, REQUEST_RESPONSE, () -> ReadReconcileNotify.serializer, () -> ReadReconcileNotify.verbHandler ), FORWARDING_WRITE (904, P3, writeTimeout, MUTATION, () -> ForwardedWrite.Request.serializer, () -> ForwardedWrite.verbHandler), + BROADCAST_LOG_OFFSETS (905, P1, rpcTimeout, MISC, () -> ShardReplicatedOffsets.serializer, () -> ShardReplicatedOffsets.verbHandler), TRACKED_PARTITION_READ_RSP (905, P2, readTimeout, REQUEST_RESPONSE, () -> TrackedDataResponse.serializer, () -> ResponseVerbHandler.instance ), TRACKED_PARTITION_READ_REQ (906, P3, readTimeout, READ, () -> TrackedRead.DataRequest.serializer, () -> TrackedRead.verbHandler, TRACKED_PARTITION_READ_RSP), diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index 74712fd968..ea21a99a31 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -21,15 +21,17 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.common.base.Preconditions; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; import org.apache.cassandra.schema.TableId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; @@ -41,29 +43,26 @@ public abstract class CoordinatorLog protected final CoordinatorLogId logId; protected final Participants participants; - /** - * State machines and an Id <-> token index for unreconciled mutation ids that exist oh this host. - */ - private final LocalMutationStates unreconciledMutations; + protected final Offsets.Mutable[] witnessedOffsets; + protected final Offsets.Mutable reconciledOffsets; - protected final Offsets.Mutable[] witnessedIds; - protected final Offsets.Mutable reconciledIds; protected final ReadWriteLock lock; + abstract UnreconciledMutations unreconciledMutations(); + CoordinatorLog(int localHostId, CoordinatorLogId logId, Participants participants) { this.localHostId = localHostId; this.logId = logId; this.participants = participants; - this.unreconciledMutations = new LocalMutationStates(); this.lock = new ReentrantReadWriteLock(); Offsets.Mutable[] ids = new Offsets.Mutable[participants.size()]; for (int i = 0; i < participants.size(); i++) ids[i] = new Offsets.Mutable(logId); - witnessedIds = ids; - reconciledIds = new Offsets.Mutable(logId); + witnessedOffsets = ids; + reconciledOffsets = new Offsets.Mutable(logId); } static CoordinatorLog create(int localHostId, CoordinatorLogId id, Participants participants) @@ -72,7 +71,7 @@ public abstract class CoordinatorLog : new CoordinatorLogReplica(localHostId, id, participants); } - void witnessedRemoteMutation(MutationId mutationId, int onHostId) + void receivedWriteResponse(MutationId mutationId, int onHostId) { Preconditions.checkArgument(!mutationId.isNone()); logger.trace("witnessed remote mutation {} from {}", mutationId, onHostId); @@ -80,26 +79,17 @@ public abstract class CoordinatorLog try { if (!get(onHostId).add(mutationId.offset())) - return; // already witnessed + return; // already witnessed; very uncommon but possible path if (!getLocal().contains(mutationId.offset())) - return; // local host hasn't witnessed -> no cleanup needed + return; // local host hasn't witnessed yet -> no cleanup needed - // see if any other replicas haven't witnessed the id yet - boolean allOtherReplicasWitnessed = true; - for (int i = 0; i < participants.size() && allOtherReplicasWitnessed; i++) - { - int hostId = participants.get(i); - if (hostId != onHostId && hostId != localHostId && !get(hostId).contains(mutationId.offset())) - allOtherReplicasWitnessed = false; - } - - if (allOtherReplicasWitnessed) + if (hasWrittenToRemoteReplicas(mutationId.offset())) { logger.trace("marking mutation {} as fully reconciled", mutationId); // if all replicas have now witnessed the id, remove it from the index - unreconciledMutations.remove(mutationId.offset()); - reconciledIds.add(mutationId.offset()); + unreconciledMutations().remove(mutationId.offset()); + reconciledOffsets.add(mutationId.offset()); } } finally @@ -108,6 +98,46 @@ public abstract class CoordinatorLog } } + void updateReplicatedOffsets(Offsets offsets, int onHostId) + { + lock.writeLock().lock(); + try + { + get(onHostId).addAll(offsets, (ignore, start, end) -> + { + for (int offset = start; offset <= end; ++offset) + { + // TODO (desired): skip checking the host's offsets - all just added + // TODO (desired): use the fact that Offsets are ordered to optimise this look up + if (hasWrittenLocally(offset) && hasWrittenToRemoteReplicas(offset)) + { + reconciledOffsets.add(offset); + unreconciledMutations().remove(offset); + } + } + }); + } + finally + { + lock.writeLock().unlock(); + } + } + + @Nullable + Offsets.Immutable collectReplicatedOffsets() + { + lock.readLock().lock(); + try + { + Offsets offsets = witnessedOffsets[participants.indexOf(localHostId)]; + return offsets.isEmpty() ? null : Offsets.Immutable.copy(offsets); + } + finally + { + lock.readLock().unlock(); + } + } + void startWriting(Mutation mutation) { lock.writeLock().lock(); @@ -116,7 +146,7 @@ public abstract class CoordinatorLog if (getLocal().contains(mutation.id().offset())) return; // already witnessed; shouldn't get to this path often (duplicate mutation) - unreconciledMutations.startWriting(mutation); + unreconciledMutations().startWriting(mutation); } finally { @@ -127,27 +157,21 @@ public abstract class CoordinatorLog void finishWriting(Mutation mutation) { logger.trace("witnessed local mutation {}", mutation.id()); + lock.writeLock().lock(); try { - if (!getLocal().add(mutation.id().offset())) - throw new IllegalStateException("finishWriting() called on a reconciled mutation"); + int offset = mutation.id().offset(); + if (!getLocal().add(offset)) + throw new IllegalStateException("finishWriting() called on a locally witnessed mutation " + mutation.id()); + + unreconciledMutations().finishWriting(mutation); - // see if any other replicas haven't witnessed the id yet - boolean allOtherReplicasWitnessed = true; - for (int i = 0; i < participants.size() && allOtherReplicasWitnessed; i++) + if (hasWrittenToRemoteReplicas(offset)) { - int hostId = participants.get(i); - if (hostId != localHostId && !get(hostId).contains(mutation.id().offset())) - allOtherReplicasWitnessed = false; + reconciledOffsets.add(offset); + unreconciledMutations().remove(offset); } - - // if some replicas also haven't witnessed the mutation yet, we should update local mutation state; - // otherwise we are the last node to witness this mutation, and can clean it up - if (allOtherReplicasWitnessed) - reconciledIds.add(mutation.id().offset()); - else - unreconciledMutations.finishWriting(mutation); } finally { @@ -155,6 +179,22 @@ public abstract class CoordinatorLog } } + private boolean hasWrittenLocally(int offset) + { + return getLocal().contains(offset); + } + + private boolean hasWrittenToRemoteReplicas(int offset) + { + for (int i = 0; i < participants.size(); ++i) + { + int hostId = participants.get(i); + if (hostId != localHostId && !get(hostId).contains(offset)) + return false; + } + return true; + } + /** * Look up unreconciled sequence ids of mutations witnessed by this host in this coordinataor log. * Adds the ids to the supplied collection, so it can be reused to aggregate lookups for multiple logs. @@ -164,8 +204,8 @@ public abstract class CoordinatorLog lock.readLock().lock(); try { - reconciledInto.addAll(reconciledIds); - return unreconciledMutations.collect(token, tableId, includePending, unreconciledInto); + reconciledInto.addAll(reconciledOffsets); + return unreconciledMutations().collect(token, tableId, includePending, unreconciledInto); } finally { @@ -182,8 +222,8 @@ public abstract class CoordinatorLog lock.readLock().lock(); try { - reconciledInto.addAll(reconciledIds); - return unreconciledMutations.collect(range, tableId, includePending, unreconciledInto); + reconciledInto.addAll(reconciledOffsets); + return unreconciledMutations().collect(range, tableId, includePending, unreconciledInto); } finally { @@ -193,21 +233,29 @@ public abstract class CoordinatorLog protected Offsets.Mutable get(int hostId) { - return witnessedIds[participants.indexOf(hostId)]; + return witnessedOffsets[participants.indexOf(hostId)]; } protected Offsets.Mutable getLocal() { - return witnessedIds[participants.indexOf(localHostId)]; + return witnessedOffsets[participants.indexOf(localHostId)]; } public static class CoordinatorLogPrimary extends CoordinatorLog { - AtomicLong sequenceId = new AtomicLong(-1); + private final AtomicLong sequenceId = new AtomicLong(-1); + private final UnreconciledMutationsReplica unreconciledMutations; CoordinatorLogPrimary(int localHostId, CoordinatorLogId logId, Participants participants) { super(localHostId, logId, participants); + unreconciledMutations = new UnreconciledMutationsReplica(); + } + + @Override + UnreconciledMutationsReplica unreconciledMutations() + { + return unreconciledMutations; } MutationId nextId() @@ -235,9 +283,18 @@ public abstract class CoordinatorLog public static class CoordinatorLogReplica extends CoordinatorLog { + private final UnreconciledMutationsReplica unreconciledMutations; + CoordinatorLogReplica(int localHostId, CoordinatorLogId logId, Participants participants) { super(localHostId, logId, participants); + this.unreconciledMutations = new UnreconciledMutationsReplica(); + } + + @Override + UnreconciledMutationsReplica unreconciledMutations() + { + return unreconciledMutations; } } } diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java index 37d4f25851..f7dd55cce7 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java @@ -118,7 +118,7 @@ public class CoordinatorLogId implements Serializable public static final Comparator<CoordinatorLogId> comparator = (l, r) -> Long.compareUnsigned(l.asLong(), r.asLong()); - public static final IVersionedSerializer<CoordinatorLogId> serializer = new IVersionedSerializer<>() + static final class Serializer implements IVersionedSerializer<CoordinatorLogId> { @Override public void serialize(CoordinatorLogId logId, DataOutputPlus out, int version) throws IOException @@ -127,6 +127,12 @@ public class CoordinatorLogId implements Serializable out.writeInt(logId.hostLogId); } + public void serialize(long logId, DataOutputPlus out, int version) throws IOException + { + out.writeInt(hostId(logId)); + out.writeInt(hostLogId(logId)); + } + @Override public CoordinatorLogId deserialize(DataInputPlus in, int version) throws IOException { @@ -142,5 +148,12 @@ public class CoordinatorLogId implements Serializable { return TypeSizes.sizeof(logId.hostId) + TypeSizes.sizeof(logId.hostLogId); } - }; + + public long serializedSize(long logId, int version) + { + return TypeSizes.sizeof(logId); + } + } + + static final Serializer serializer = new Serializer(); } diff --git a/src/java/org/apache/cassandra/replication/ForwardedWrite.java b/src/java/org/apache/cassandra/replication/ForwardedWrite.java index 51a072a423..41153e307f 100644 --- a/src/java/org/apache/cassandra/replication/ForwardedWrite.java +++ b/src/java/org/apache/cassandra/replication/ForwardedWrite.java @@ -402,7 +402,7 @@ public class ForwardedWrite { // Local mutations are witnessed from Keyspace.applyInternalTracked if (msg != null) - MutationTrackingService.instance.witnessedRemoteMutation(keyspace, token, id, msg.from()); + MutationTrackingService.instance.receivedWriteResponse(keyspace, token, id, msg.from()); // Local write needs to be ack'd to coordinator if (msg == null && ackTo != null) diff --git a/src/java/org/apache/cassandra/replication/MutationSummary.java b/src/java/org/apache/cassandra/replication/MutationSummary.java index 53a18d1046..c786b295fe 100644 --- a/src/java/org/apache/cassandra/replication/MutationSummary.java +++ b/src/java/org/apache/cassandra/replication/MutationSummary.java @@ -255,6 +255,14 @@ public class MutationSummary return count; } + public int reconciledIds() + { + int count = 0; + for (CoordinatorSummary summary : summaries) + count += summary.reconciled.offsetCount(); + return count; + } + public int size() { return summaries.size(); diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 3e48840531..efff030e7b 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -18,30 +18,44 @@ package org.apache.cassandra.replication; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.IntSupplier; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.agrona.collections.IntArrayList; +import org.apache.cassandra.concurrent.ScheduledExecutorPlus; +import org.apache.cassandra.concurrent.Shutdownable; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.Verb; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.reads.tracked.TrackedLocalReads; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.FBUtilities; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; +import static org.apache.cassandra.concurrent.ExecutorFactory.SimulatorSemantics.NORMAL; + // TODO (expected): persistence (handle restarts) // TODO (expected): handle topology changes public class MutationTrackingService @@ -50,6 +64,7 @@ public class MutationTrackingService public static final MutationTrackingService instance = new MutationTrackingService(); private final TrackedLocalReads localReads = new TrackedLocalReads(); + private final ReplicatedOffsetsBroadcaster broadcaster = new ReplicatedOffsetsBroadcaster(); private final ConcurrentHashMap<String, KeyspaceShards> shards = new ConcurrentHashMap<>(); private volatile boolean started = false; @@ -67,6 +82,9 @@ public class MutationTrackingService for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces()) if (keyspace.useMutationTracking()) shards.put(keyspace.name, KeyspaceShards.make(keyspace, metadata, this::nextHostLogId)); + + broadcaster.start(); + started = true; } @@ -92,10 +110,15 @@ public class MutationTrackingService return id; } - public void witnessedRemoteMutation(String keyspace, Token token, MutationId mutationId, InetAddressAndPort onHost) + public void receivedWriteResponse(String keyspace, Token token, MutationId mutationId, InetAddressAndPort onHost) { Preconditions.checkArgument(!mutationId.isNone()); - getOrCreate(keyspace).witnessedRemoteMutation(token, mutationId, onHost); + getOrCreate(keyspace).receivedWriteResponse(token, mutationId, onHost); + } + + public void updateReplicatedOffsets(String keyspace, Range<Token> range, List<? extends Offsets> offsets, InetAddressAndPort onHost) + { + getOrCreate(keyspace).updateReplicatedOffsets(range, offsets, onHost); } public void startWriting(Mutation mutation) @@ -125,6 +148,12 @@ public class MutationTrackingService return createSummaryForRange(Range.makeRowRange(range), tableId, includePending); } + void forEachKeyspace(Consumer<KeyspaceShards> consumer) + { + for (KeyspaceShards keyspaceShards : shards.values()) + consumer.accept(keyspaceShards); + } + private KeyspaceShards getOrCreate(TableId tableId) { //noinspection DataFlowIssue @@ -161,11 +190,13 @@ public class MutationTrackingService Preconditions.checkArgument(keyspace.params.replicationType.isTracked()); Map<Range<Token>, Shard> shards = new HashMap<>(); cluster.placements.get(keyspace.params.replication).writes.forEach((tokenRange, forRange) -> { - IntArrayList participants = new IntArrayList(forRange.size(), IntArrayList.DEFAULT_NULL_VALUE); - for (InetAddressAndPort endpoint : forRange.endpoints()) - participants.add(cluster.directory.peerId(endpoint).id()); - Shard shard = new Shard(keyspace.name, tokenRange, cluster.myNodeId().id(), new Participants(participants), forRange.lastModified(), logIdProvider); - shards.put(tokenRange, shard); + if (!forRange.endpoints().contains(FBUtilities.getBroadcastAddressAndPort())) + return; + IntArrayList participants = new IntArrayList(forRange.size(), IntArrayList.DEFAULT_NULL_VALUE); + for (InetAddressAndPort endpoint : forRange.endpoints()) + participants.add(cluster.directory.peerId(endpoint).id()); + Shard shard = new Shard(keyspace.name, tokenRange, cluster.myNodeId().id(), new Participants(participants), forRange.lastModified(), logIdProvider); + shards.put(tokenRange, shard); }); return new KeyspaceShards(keyspace.name, shards); } @@ -184,9 +215,14 @@ public class MutationTrackingService return lookUp(token).nextId(); } - void witnessedRemoteMutation(Token token, MutationId mutationId, InetAddressAndPort onHost) + void receivedWriteResponse(Token token, MutationId mutationId, InetAddressAndPort onHost) + { + lookUp(token).receivedWriteResponse(mutationId, onHost); + } + + void updateReplicatedOffsets(Range<Token> range, List<? extends Offsets> offsets, InetAddressAndPort onHost) { - lookUp(token).witnessedRemoteMutation(mutationId, onHost); + shards.get(range).updateReplicatedOffsets(offsets, onHost); } void startWriting(Mutation mutation) @@ -224,6 +260,12 @@ public class MutationTrackingService }); } + void forEachShard(Consumer<Shard> consumer) + { + for (Shard shard : shards.values()) + consumer.accept(shard); + } + Shard lookUp(Mutation mutation) { return lookUp(mutation.key()); @@ -242,4 +284,73 @@ public class MutationTrackingService return shards.get(range); } } + + // TODO (later): a more intelligent heuristic for offsets included in broadcasts + private static class ReplicatedOffsetsBroadcaster implements Runnable, Shutdownable + { + private static final ScheduledExecutorPlus executor = + executorFactory().scheduled("Replicated-Offsets-Broadcaster", NORMAL); + + // TODO (later): a more intelligent heuristic for scheduling broadcasts + private static final long BROADCAST_INTERVAL_MILLIS = 200; + + void start() + { + executor.scheduleWithFixedDelay(this, BROADCAST_INTERVAL_MILLIS, BROADCAST_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + } + + @Override + public boolean isTerminated() + { + return executor.isTerminated(); + } + + @Override + public void shutdown() + { + executor.shutdown(); + } + + @Override + public Object shutdownNow() + { + return executor.shutdownNow(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException + { + return executor.awaitTermination(timeout, units); + } + + @Override + public void run() + { + MutationTrackingService.instance.forEachKeyspace(this::run); + } + + private void run(KeyspaceShards shards) + { + shards.forEachShard(this::run); + } + + private void run(Shard shard) + { + ShardReplicatedOffsets replicatedOffsets = shard.collectReplicatedOffsets(); + if (replicatedOffsets.isEmpty()) + return; + + Message<ShardReplicatedOffsets> message = Message.out(Verb.BROADCAST_LOG_OFFSETS, replicatedOffsets); + + for (InetAddressAndPort target : shard.remoteReplicas()) + if (FailureDetector.instance.isAlive(target)) + MessagingService.instance().send(message, target); + } + } + + @VisibleForTesting + public void broadcastOffsetsForTesting() + { + broadcaster.run(); + } } diff --git a/src/java/org/apache/cassandra/replication/Offsets.java b/src/java/org/apache/cassandra/replication/Offsets.java index df1512617a..28e2d978f7 100644 --- a/src/java/org/apache/cassandra/replication/Offsets.java +++ b/src/java/org/apache/cassandra/replication/Offsets.java @@ -351,6 +351,7 @@ public abstract class Offsets implements Iterable<ShortMutationId> if (size == 0) { append(start, end); + onAdded.consume(logId, start, end); return true; } @@ -529,24 +530,6 @@ public abstract class Offsets implements Iterable<ShortMutationId> bounds[size++] = start; bounds[size++] = end; } - - public void append(int offset) - { - if (size == 0) - { - append(offset, offset); - return; - } - - int tail = bounds[size - 1]; - if (offset <= tail) - throw new IllegalArgumentException("Can't append " + offset + " to " + tail); - - if (offset == tail + 1) - bounds[size-1] = offset; - else - append(offset, offset); - } } public static class Mutable extends AbstractMutable<Mutable> @@ -1014,7 +997,7 @@ public abstract class Offsets implements Iterable<ShortMutationId> case VALID: state = computeNext(); if (!state.isFinished()) - break;; + break; case FINISHED: return false; default: diff --git a/src/java/org/apache/cassandra/replication/Participants.java b/src/java/org/apache/cassandra/replication/Participants.java index c665d63a99..494dd3adca 100644 --- a/src/java/org/apache/cassandra/replication/Participants.java +++ b/src/java/org/apache/cassandra/replication/Participants.java @@ -46,6 +46,11 @@ public class Participants return idx; } + boolean contains(int hostId) + { + return indexOf(hostId) >= 0; + } + int get(int idx) { if (idx < 0 || idx >= hosts.length) diff --git a/src/java/org/apache/cassandra/replication/Shard.java b/src/java/org/apache/cassandra/replication/Shard.java index 72ff1119ca..1a8ca357a6 100644 --- a/src/java/org/apache/cassandra/replication/Shard.java +++ b/src/java/org/apache/cassandra/replication/Shard.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.replication; +import java.util.ArrayList; +import java.util.List; import java.util.function.IntSupplier; import com.google.common.base.Preconditions; @@ -30,6 +32,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.replication.CoordinatorLog.CoordinatorLogPrimary; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.membership.NodeId; import org.jctools.maps.NonBlockingHashMapLong; public class Shard @@ -45,6 +48,8 @@ public class Shard Shard(String keyspace, Range<Token> tokenRange, int localHostId, Participants participants, Epoch sinceEpoch, IntSupplier logIdProvider) { + Preconditions.checkArgument(participants.contains(localHostId)); + this.keyspace = keyspace; this.tokenRange = tokenRange; this.localHostId = localHostId; @@ -62,20 +67,27 @@ public class Shard return currentLocalLog.nextId(); } - void witnessedRemoteMutation(MutationId mutationId, InetAddressAndPort onHost) + void receivedWriteResponse(MutationId mutationId, InetAddressAndPort onHost) + { + int onHostId = ClusterMetadata.current().directory.peerId(onHost).id(); + getOrCreate(mutationId).receivedWriteResponse(mutationId, onHostId); + } + + void updateReplicatedOffsets(List<? extends Offsets> offsets, InetAddressAndPort onHost) { int onHostId = ClusterMetadata.current().directory.peerId(onHost).id(); - get(mutationId).witnessedRemoteMutation(mutationId, onHostId); + for (Offsets logOffsets : offsets) + getOrCreate(logOffsets.logId()).updateReplicatedOffsets(logOffsets, onHostId); } void startWriting(Mutation mutation) { - get(mutation.id()).startWriting(mutation); + getOrCreate(mutation.id()).startWriting(mutation); } void finishWriting(Mutation mutation) { - get(mutation.id()).finishWriting(mutation); + getOrCreate(mutation.id()).finishWriting(mutation); } void addSummaryForKey(Token token, boolean includePending, MutationSummary.Builder builder) @@ -94,6 +106,34 @@ public class Shard }); } + List<InetAddressAndPort> remoteReplicas() + { + List<InetAddressAndPort> replicas = new ArrayList<>(participants.size() - 1); + for (int i = 0, size = participants.size(); i < size; ++i) + { + int hostId = participants.get(i); + if (hostId != localHostId) + replicas.add(ClusterMetadata.current().directory.endpoint(new NodeId(hostId))); + } + return replicas; + } + + /** + * Collects replicated offsets for the logs owned by this coordinator on this shard. + */ + ShardReplicatedOffsets collectReplicatedOffsets() + { + List<Offsets.Immutable> offsets = new ArrayList<>(); + for (CoordinatorLog log : logs.values()) + { + Offsets.Immutable logOffsets = log.collectReplicatedOffsets(); + if (logOffsets != null) + offsets.add(logOffsets); + } + + return new ShardReplicatedOffsets(keyspace, tokenRange, offsets); + } + /** * Creates a new coordinator log for this host. Primarily on Shard init (node startup or topology change). * Also on keyspace creation. @@ -104,13 +144,18 @@ public class Shard return new CoordinatorLog.CoordinatorLogPrimary(localHostId, logId, participants); } - private CoordinatorLog get(MutationId mutationId) + private CoordinatorLog getOrCreate(MutationId mutationId) { Preconditions.checkArgument(!mutationId.isNone()); - return get(mutationId.logId()); + return getOrCreate(mutationId.logId()); + } + + private CoordinatorLog getOrCreate(CoordinatorLogId logId) + { + return getOrCreate(logId.asLong()); } - private CoordinatorLog get(long logId) + private CoordinatorLog getOrCreate(long logId) { CoordinatorLog log = logs.get(logId); return log != null diff --git a/src/java/org/apache/cassandra/replication/ShardReplicatedOffsets.java b/src/java/org/apache/cassandra/replication/ShardReplicatedOffsets.java new file mode 100644 index 0000000000..267b63c9d2 --- /dev/null +++ b/src/java/org/apache/cassandra/replication/ShardReplicatedOffsets.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.IVerbHandler; + +public class ShardReplicatedOffsets +{ + private static final Logger logger = LoggerFactory.getLogger(ShardReplicatedOffsets.class); + + private final String keyspace; + private final Range<Token> range; + private final List<Offsets.Immutable> replicatedOffsets; + + public ShardReplicatedOffsets(String keyspace, Range<Token> range, List<Offsets.Immutable> offsets) + { + this.keyspace = keyspace; + this.range = range; + this.replicatedOffsets = offsets; + } + + boolean isEmpty() + { + return replicatedOffsets.isEmpty(); + } + + @Override + public String toString() + { + return "ShardReplicatedOffsets{" + keyspace + ", " + range + ", " + replicatedOffsets + '}'; + } + + public static final IVerbHandler<ShardReplicatedOffsets> verbHandler = message -> { + ShardReplicatedOffsets replicatedOffsets = message.payload; + logger.trace("Received replicated offsets {} from {}", replicatedOffsets, message.from()); + MutationTrackingService.instance.updateReplicatedOffsets(replicatedOffsets.keyspace, + replicatedOffsets.range, + replicatedOffsets.replicatedOffsets, + message.from()); + }; + + public static final IVersionedSerializer<ShardReplicatedOffsets> serializer = new IVersionedSerializer<>() + { + @Override + public void serialize(ShardReplicatedOffsets status, DataOutputPlus out, int version) throws IOException + { + out.writeUTF(status.keyspace); + AbstractBounds.tokenSerializer.serialize(status.range, out, version); + out.writeInt(status.replicatedOffsets.size()); + for (Offsets.Immutable logOffsets : status.replicatedOffsets) + Offsets.serializer.serialize(logOffsets, out, version); + } + + @Override + public ShardReplicatedOffsets deserialize(DataInputPlus in, int version) throws IOException + { + String keyspace = in.readUTF(); + Range<Token> range = (Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, IPartitioner.global(), version); + int count = in.readInt(); + List<Offsets.Immutable> replicatedOffsets = new ArrayList<>(count); + for (int i = 0; i < count; ++i) + replicatedOffsets.add(Offsets.serializer.deserialize(in, version)); + return new ShardReplicatedOffsets(keyspace, range, replicatedOffsets); + } + + @Override + public long serializedSize(ShardReplicatedOffsets replicatedOffsets, int version) + { + long size = 0; + size += TypeSizes.sizeof(replicatedOffsets.keyspace); + size += AbstractBounds.tokenSerializer.serializedSize(replicatedOffsets.range, version); + size += TypeSizes.sizeof(replicatedOffsets.replicatedOffsets.size()); + for (Offsets.Immutable logOffsets : replicatedOffsets.replicatedOffsets) + size += Offsets.serializer.serializedSize(logOffsets, version); + return size; + } + }; +} diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java index 7ae7b35e77..ba58f4f1f7 100644 --- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java +++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java @@ -168,10 +168,10 @@ public class TrackedWriteRequest if (remoteDCReplicas == null) remoteDCReplicas = new HashMap<>(); - List<Replica> messages = remoteDCReplicas.get(dc); - if (messages == null) - messages = remoteDCReplicas.computeIfAbsent(dc, ignore -> new ArrayList<>(3)); // most DCs will have <= 3 replicas - messages.add(destination); + List<Replica> replicas = remoteDCReplicas.get(dc); + if (replicas == null) + replicas = remoteDCReplicas.computeIfAbsent(dc, ignore -> new ArrayList<>(3)); // most DCs will have <= 3 replicas + replicas.add(destination); } } @@ -179,8 +179,8 @@ public class TrackedWriteRequest applyMutationLocally(mutation, handler); if (localDCReplicas != null) - for (Replica destination : localDCReplicas) - MessagingService.instance().sendWriteWithCallback(message, destination, handler); + for (Replica replica : localDCReplicas) + MessagingService.instance().sendWriteWithCallback(message, replica, handler); if (remoteDCReplicas != null) { diff --git a/src/java/org/apache/cassandra/replication/Participants.java b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java similarity index 51% copy from src/java/org/apache/cassandra/replication/Participants.java copy to src/java/org/apache/cassandra/replication/UnreconciledMutations.java index c665d63a99..a84bdc5162 100644 --- a/src/java/org/apache/cassandra/replication/Participants.java +++ b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java @@ -17,39 +17,25 @@ */ package org.apache.cassandra.replication; -import java.util.Arrays; -import java.util.Collection; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.TableId; -public class Participants +/** + * Tracks unreconciled local mutations - the subset of all unreconciled mutations + * that have been witnessed, or are currently being written to, on the local node. + */ +interface UnreconciledMutations { - private final int[] hosts; + void startWriting(Mutation mutation); - Participants(Collection<Integer> participants) - { - int i = 0; - int[] hosts = new int[participants.size()]; - for (int host : participants) hosts[i++] = host; - Arrays.sort(hosts); - this.hosts = hosts; - } + void finishWriting(Mutation mutation); - int size() - { - return hosts.length; - } + void remove(int offset); - int indexOf(int hostId) - { - int idx = Arrays.binarySearch(hosts, hostId); - if (idx < 0) - throw new IllegalArgumentException("Unknown host id " + hostId); - return idx; - } + boolean collect(Token token, TableId tableId, boolean includePending, Offsets.OffsetReciever into); - int get(int idx) - { - if (idx < 0 || idx >= hosts.length) - throw new IllegalArgumentException("Out of bounds host idx " + idx); - return hosts[idx]; - } + boolean collect(AbstractBounds<PartitionPosition> range, TableId tableId, boolean includePending, Offsets.OffsetReciever into); } diff --git a/src/java/org/apache/cassandra/replication/LocalMutationStates.java b/src/java/org/apache/cassandra/replication/UnreconciledMutationsReplica.java similarity index 92% rename from src/java/org/apache/cassandra/replication/LocalMutationStates.java rename to src/java/org/apache/cassandra/replication/UnreconciledMutationsReplica.java index ce1f13dd98..821c8db0fc 100644 --- a/src/java/org/apache/cassandra/replication/LocalMutationStates.java +++ b/src/java/org/apache/cassandra/replication/UnreconciledMutationsReplica.java @@ -37,7 +37,7 @@ import org.apache.cassandra.schema.TableId; * Tracks unreconciled local mutations - the subset of all unreconciled mutations * that have been witnessed, or are currently being written to, on the local node. */ -class LocalMutationStates +class UnreconciledMutationsReplica implements UnreconciledMutations { private final Int2ObjectHashMap<Entry> statesMap = new Int2ObjectHashMap<>(); private final SortedSet<Entry> statesSet = new TreeSet<>(Entry.comparator); @@ -115,14 +115,16 @@ class LocalMutationStates } } - void startWriting(Mutation mutation) + @Override + public void startWriting(Mutation mutation) { Entry entry = Entry.create(mutation); statesMap.put(entry.offset, entry); statesSet.add(entry); } - void finishWriting(Mutation mutation) + @Override + public void finishWriting(Mutation mutation) { Preconditions.checkArgument(!mutation.id().isNone()); Entry entry = statesMap.get(mutation.id().offset()); @@ -130,20 +132,23 @@ class LocalMutationStates entry.visibility = Visibility.VISIBLE; } - void remove(int offset) + @Override + public void remove(int offset) { Entry state = statesMap.remove(offset); Preconditions.checkNotNull(state); statesSet.remove(state); } - boolean collect(Token token, TableId tableId, boolean includePending, Offsets.OffsetReciever into) + @Override + public boolean collect(Token token, TableId tableId, boolean includePending, Offsets.OffsetReciever into) { SortedSet<Entry> subset = statesSet.subSet(Entry.start(token, true), Entry.end(token, true)); return collect(subset, tableId, includePending, into); } - boolean collect(AbstractBounds<PartitionPosition> range, TableId tableId, boolean includePending, Offsets.OffsetReciever into) + @Override + public boolean collect(AbstractBounds<PartitionPosition> range, TableId tableId, boolean includePending, Offsets.OffsetReciever into) { Entry start = Entry.start(range.left.getToken(), range.left.kind() != PartitionPosition.Kind.MAX_BOUND); Entry end = Entry.end(range.right.getToken(), range.right.kind() != PartitionPosition.Kind.MIN_BOUND); diff --git a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java index 56528f89c8..c3eb76a46d 100644 --- a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java @@ -56,7 +56,7 @@ public class TrackedWriteResponseHandler extends AbstractWriteResponseHandler<No { // Local mutations are witnessed from Keyspace.applyInternalTracked if (msg != null) - MutationTrackingService.instance.witnessedRemoteMutation(keyspace, token, mutationId, msg.from()); + MutationTrackingService.instance.receivedWriteResponse(keyspace, token, mutationId, msg.from()); wrapped.onResponse(msg); } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java index 3f0e3ca3a5..6a72a7a01a 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java @@ -68,7 +68,7 @@ public class ReadReconcileNotify } }; - public static final IVersionedSerializer<ReadReconcileNotify> serializer = new IVersionedSerializer<ReadReconcileNotify>() + public static final IVersionedSerializer<ReadReconcileNotify> serializer = new IVersionedSerializer<>() { @Override public void serialize(ReadReconcileNotify notify, DataOutputPlus out, int version) throws IOException diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java new file mode 100644 index 0000000000..5cd9c8cb2c --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.tracking; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.replication.MutationSummary; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.getOnlyLogId; + +public class OffsetBroadcastTest extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(OffsetBroadcastTest.class); + + @Test + public void testBroadcastOffsets() throws Throwable + { + try (Cluster cluster = Cluster.build(3) + .withConfig(cfg -> cfg.with(Feature.NETWORK) + .with(Feature.GOSSIP) + .set("mutation_tracking_enabled", "true")) + .start()) + { + + cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked';")); + + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int primary key, v int);")); + + String keyspaceName = KEYSPACE; + + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k, v) VALUES (1, 1)"), ConsistencyLevel.QUORUM); + + for (int i = 1; i <= cluster.size(); ++i) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); + + for (int i = 1; i <= cluster.size(); ++i) + { + cluster.get(i).runOnInstance(() -> { + TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); + DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); + MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + MutationSummary.CoordinatorSummary coordinatorSummary = summary.get(getOnlyLogId(summary)); + Assert.assertEquals(1, coordinatorSummary.reconciled.offsetCount()); + Assert.assertEquals(0, coordinatorSummary.unreconciled.offsetCount()); + }); + } + } + } +} diff --git a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java index 37bc1cd080..f89f1a521b 100644 --- a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java +++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java @@ -68,7 +68,7 @@ public class CoordinatorLogTest { Offsets.Mutable list = new Offsets.Mutable(LOG_ID); for (MutationId id : ids) - list.append(id.offset()); + list.add(id.offset()); return list; } @@ -119,10 +119,10 @@ public class CoordinatorLogTest // the call to finishWriting will have made the ids visible without the includePending flag assertUnreconciled(tk, tableId, log, false, reconciled, ids); - log.witnessedRemoteMutation(ids[0], PARTICIPANTS.get(1)); + log.receivedWriteResponse(ids[0], PARTICIPANTS.get(1)); assertUnreconciled(tk, tableId, log, false, reconciled, ids); - log.witnessedRemoteMutation(ids[0], PARTICIPANTS.get(2)); + log.receivedWriteResponse(ids[0], PARTICIPANTS.get(2)); reconciled.add(ids[0].offset()); assertUnreconciled(tk, tableId, log, false, reconciled, ids[1], ids[2]); } diff --git a/test/unit/org/apache/cassandra/replication/OffsetsTest.java b/test/unit/org/apache/cassandra/replication/OffsetsTest.java index 539da7d1b0..1d114a62f2 100644 --- a/test/unit/org/apache/cassandra/replication/OffsetsTest.java +++ b/test/unit/org/apache/cassandra/replication/OffsetsTest.java @@ -510,22 +510,22 @@ public class OffsetsTest public void appendTest() { Offsets.Mutable ids = new Offsets.Mutable(LOG_ID); - ids.append(5); + ids.add(5); assertEquals(1, ids.rangeCount()); assertEquals(1, ids.offsetCount()); - ids.append(6); + ids.add(6); assertEquals(1, ids.rangeCount()); assertEquals(2, ids.offsetCount()); - ids.append(8); + ids.add(8); assertEquals(2, ids.rangeCount()); assertEquals(3, ids.offsetCount()); // insert before tail try { - ids.append(8); + ids.add(8); Assert.fail(); } catch (IllegalArgumentException e) @@ -538,7 +538,7 @@ public class OffsetsTest // insert before tail try { - ids.append(7); + ids.add(7); Assert.fail(); } catch (IllegalArgumentException e) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org