This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit b9ac1602d9d8d55dabbefe044560d7564cf14952
Author: Aleksey Yeschenko <alek...@apache.org>
AuthorDate: Fri Apr 4 12:38:28 2025 +0100

    Implement coordinator log offset broadcasting
    
    patch by Aleksey Yeschenko; reviewed by Blake Eggleston for CASSANDRA-20576
---
 src/java/org/apache/cassandra/net/Verb.java        |   2 +
 .../cassandra/replication/CoordinatorLog.java      | 157 ++++++++++++++-------
 .../cassandra/replication/CoordinatorLogId.java    |  17 ++-
 .../cassandra/replication/ForwardedWrite.java      |   2 +-
 .../cassandra/replication/MutationSummary.java     |   8 ++
 .../replication/MutationTrackingService.java       | 129 +++++++++++++++--
 .../org/apache/cassandra/replication/Offsets.java  |  21 +--
 .../apache/cassandra/replication/Participants.java |   5 +
 .../org/apache/cassandra/replication/Shard.java    |  59 +++++++-
 .../replication/ShardReplicatedOffsets.java        | 108 ++++++++++++++
 .../cassandra/replication/TrackedWriteRequest.java |  12 +-
 ...articipants.java => UnreconciledMutations.java} |  44 ++----
 ...ates.java => UnreconciledMutationsReplica.java} |  17 ++-
 .../service/TrackedWriteResponseHandler.java       |   2 +-
 .../service/reads/tracked/ReadReconcileNotify.java |   2 +-
 .../test/tracking/OffsetBroadcastTest.java         |  79 +++++++++++
 .../cassandra/replication/CoordinatorLogTest.java  |   6 +-
 .../apache/cassandra/replication/OffsetsTest.java  |  10 +-
 18 files changed, 541 insertions(+), 139 deletions(-)

diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index b96978ae7e..af287d6305 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -73,6 +73,7 @@ import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.repair.messages.ValidationResponse;
 import org.apache.cassandra.repair.messages.ValidationRequest;
 import org.apache.cassandra.replication.ForwardedWrite;
+import org.apache.cassandra.replication.ShardReplicatedOffsets;
 import org.apache.cassandra.schema.SchemaMutationsSerializer;
 import org.apache.cassandra.schema.SchemaPullVerbHandler;
 import org.apache.cassandra.schema.SchemaPushVerbHandler;
@@ -249,6 +250,7 @@ public enum Verb
     READ_RECONCILE_RCV         (902, P0, rpcTimeout,   MUTATION,         () -> 
ReadReconcileReceive.serializer,       () -> ReadReconcileReceive.verbHandler   
                          ),
     READ_RECONCILE_NOTIFY      (903, P0, rpcTimeout,   REQUEST_RESPONSE, () -> 
ReadReconcileNotify.serializer,        () -> ReadReconcileNotify.verbHandler    
                          ),
     FORWARDING_WRITE           (904, P3, writeTimeout, MUTATION,         () -> 
ForwardedWrite.Request.serializer,     () -> ForwardedWrite.verbHandler),
+    BROADCAST_LOG_OFFSETS      (905, P1, rpcTimeout,   MISC,             () -> 
ShardReplicatedOffsets.serializer,     () -> 
ShardReplicatedOffsets.verbHandler),
 
     TRACKED_PARTITION_READ_RSP (905, P2, readTimeout,  REQUEST_RESPONSE, () -> 
TrackedDataResponse.serializer,        () -> ResponseVerbHandler.instance       
                          ),
     TRACKED_PARTITION_READ_REQ (906, P3, readTimeout,  READ,             () -> 
TrackedRead.DataRequest.serializer,    () -> TrackedRead.verbHandler,           
TRACKED_PARTITION_READ_RSP),
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index 74712fd968..ea21a99a31 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -21,15 +21,17 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.schema.TableId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 
@@ -41,29 +43,26 @@ public abstract class CoordinatorLog
     protected final CoordinatorLogId logId;
     protected final Participants participants;
 
-    /**
-     * State machines and an Id <-> token index for unreconciled mutation ids 
that exist oh this host.
-     */
-    private final LocalMutationStates unreconciledMutations;
+    protected final Offsets.Mutable[] witnessedOffsets;
+    protected final Offsets.Mutable reconciledOffsets;
 
-    protected final Offsets.Mutable[] witnessedIds;
-    protected final Offsets.Mutable reconciledIds;
     protected final ReadWriteLock lock;
 
+    abstract UnreconciledMutations unreconciledMutations();
+
     CoordinatorLog(int localHostId, CoordinatorLogId logId, Participants 
participants)
     {
         this.localHostId = localHostId;
         this.logId = logId;
         this.participants = participants;
-        this.unreconciledMutations = new LocalMutationStates();
         this.lock = new ReentrantReadWriteLock();
 
         Offsets.Mutable[] ids = new Offsets.Mutable[participants.size()];
         for (int i = 0; i < participants.size(); i++)
             ids[i] = new Offsets.Mutable(logId);
 
-        witnessedIds = ids;
-        reconciledIds = new Offsets.Mutable(logId);
+        witnessedOffsets = ids;
+        reconciledOffsets = new Offsets.Mutable(logId);
     }
 
     static CoordinatorLog create(int localHostId, CoordinatorLogId id, 
Participants participants)
@@ -72,7 +71,7 @@ public abstract class CoordinatorLog
                                         : new 
CoordinatorLogReplica(localHostId, id, participants);
     }
 
-    void witnessedRemoteMutation(MutationId mutationId, int onHostId)
+    void receivedWriteResponse(MutationId mutationId, int onHostId)
     {
         Preconditions.checkArgument(!mutationId.isNone());
         logger.trace("witnessed remote mutation {} from {}", mutationId, 
onHostId);
@@ -80,26 +79,17 @@ public abstract class CoordinatorLog
         try
         {
             if (!get(onHostId).add(mutationId.offset()))
-                return; // already witnessed
+                return; // already witnessed; very uncommon but possible path
 
             if (!getLocal().contains(mutationId.offset()))
-                return; // local host hasn't witnessed -> no cleanup needed
+                return; // local host hasn't witnessed yet -> no cleanup needed
 
-            // see if any other replicas haven't witnessed the id yet
-            boolean allOtherReplicasWitnessed = true;
-            for (int i = 0; i < participants.size() && 
allOtherReplicasWitnessed; i++)
-            {
-                int hostId = participants.get(i);
-                if (hostId != onHostId && hostId != localHostId && 
!get(hostId).contains(mutationId.offset()))
-                    allOtherReplicasWitnessed = false;
-            }
-
-            if (allOtherReplicasWitnessed)
+            if (hasWrittenToRemoteReplicas(mutationId.offset()))
             {
                 logger.trace("marking mutation {} as fully reconciled", 
mutationId);
                 // if all replicas have now witnessed the id, remove it from 
the index
-                unreconciledMutations.remove(mutationId.offset());
-                reconciledIds.add(mutationId.offset());
+                unreconciledMutations().remove(mutationId.offset());
+                reconciledOffsets.add(mutationId.offset());
             }
         }
         finally
@@ -108,6 +98,46 @@ public abstract class CoordinatorLog
         }
     }
 
+    void updateReplicatedOffsets(Offsets offsets, int onHostId)
+    {
+        lock.writeLock().lock();
+        try
+        {
+            get(onHostId).addAll(offsets, (ignore, start, end) ->
+            {
+                for (int offset = start; offset <= end; ++offset)
+                {
+                    // TODO (desired): skip checking the host's offsets - all 
just added
+                    // TODO (desired): use the fact that Offsets are ordered 
to optimise this look up
+                    if (hasWrittenLocally(offset) && 
hasWrittenToRemoteReplicas(offset))
+                    {
+                        reconciledOffsets.add(offset);
+                        unreconciledMutations().remove(offset);
+                    }
+                }
+            });
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
+    @Nullable
+    Offsets.Immutable collectReplicatedOffsets()
+    {
+        lock.readLock().lock();
+        try
+        {
+            Offsets offsets = 
witnessedOffsets[participants.indexOf(localHostId)];
+            return offsets.isEmpty() ? null : Offsets.Immutable.copy(offsets);
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+    }
+
     void startWriting(Mutation mutation)
     {
         lock.writeLock().lock();
@@ -116,7 +146,7 @@ public abstract class CoordinatorLog
             if (getLocal().contains(mutation.id().offset()))
                 return; // already witnessed; shouldn't get to this path often 
(duplicate mutation)
 
-            unreconciledMutations.startWriting(mutation);
+            unreconciledMutations().startWriting(mutation);
         }
         finally
         {
@@ -127,27 +157,21 @@ public abstract class CoordinatorLog
     void finishWriting(Mutation mutation)
     {
         logger.trace("witnessed local mutation {}", mutation.id());
+
         lock.writeLock().lock();
         try
         {
-            if (!getLocal().add(mutation.id().offset()))
-                throw new IllegalStateException("finishWriting() called on a 
reconciled mutation");
+            int offset = mutation.id().offset();
+            if (!getLocal().add(offset))
+                throw new IllegalStateException("finishWriting() called on a 
locally witnessed mutation " + mutation.id());
+
+            unreconciledMutations().finishWriting(mutation);
 
-            // see if any other replicas haven't witnessed the id yet
-            boolean allOtherReplicasWitnessed = true;
-            for (int i = 0; i < participants.size() && 
allOtherReplicasWitnessed; i++)
+            if (hasWrittenToRemoteReplicas(offset))
             {
-                int hostId = participants.get(i);
-                if (hostId != localHostId && 
!get(hostId).contains(mutation.id().offset()))
-                    allOtherReplicasWitnessed = false;
+                reconciledOffsets.add(offset);
+                unreconciledMutations().remove(offset);
             }
-
-            // if some replicas also haven't witnessed the mutation yet, we 
should update local mutation state;
-            // otherwise we are the last node to witness this mutation, and 
can clean it up
-            if (allOtherReplicasWitnessed)
-                reconciledIds.add(mutation.id().offset());
-            else
-                unreconciledMutations.finishWriting(mutation);
         }
         finally
         {
@@ -155,6 +179,22 @@ public abstract class CoordinatorLog
         }
     }
 
+    private boolean hasWrittenLocally(int offset)
+    {
+        return getLocal().contains(offset);
+    }
+
+    private boolean hasWrittenToRemoteReplicas(int offset)
+    {
+        for (int i = 0; i < participants.size(); ++i)
+        {
+            int hostId = participants.get(i);
+            if (hostId != localHostId && !get(hostId).contains(offset))
+                return false;
+        }
+        return true;
+    }
+
     /**
      * Look up unreconciled sequence ids of mutations witnessed by this host 
in this coordinataor log.
      * Adds the ids to the supplied collection, so it can be reused to 
aggregate lookups for multiple logs.
@@ -164,8 +204,8 @@ public abstract class CoordinatorLog
         lock.readLock().lock();
         try
         {
-            reconciledInto.addAll(reconciledIds);
-            return unreconciledMutations.collect(token, tableId, 
includePending, unreconciledInto);
+            reconciledInto.addAll(reconciledOffsets);
+            return unreconciledMutations().collect(token, tableId, 
includePending, unreconciledInto);
         }
         finally
         {
@@ -182,8 +222,8 @@ public abstract class CoordinatorLog
         lock.readLock().lock();
         try
         {
-            reconciledInto.addAll(reconciledIds);
-            return unreconciledMutations.collect(range, tableId, 
includePending, unreconciledInto);
+            reconciledInto.addAll(reconciledOffsets);
+            return unreconciledMutations().collect(range, tableId, 
includePending, unreconciledInto);
         }
         finally
         {
@@ -193,21 +233,29 @@ public abstract class CoordinatorLog
 
     protected Offsets.Mutable get(int hostId)
     {
-        return witnessedIds[participants.indexOf(hostId)];
+        return witnessedOffsets[participants.indexOf(hostId)];
     }
 
     protected Offsets.Mutable getLocal()
     {
-        return witnessedIds[participants.indexOf(localHostId)];
+        return witnessedOffsets[participants.indexOf(localHostId)];
     }
 
     public static class CoordinatorLogPrimary extends CoordinatorLog
     {
-        AtomicLong sequenceId = new AtomicLong(-1);
+        private final AtomicLong sequenceId = new AtomicLong(-1);
+        private final UnreconciledMutationsReplica unreconciledMutations;
 
         CoordinatorLogPrimary(int localHostId, CoordinatorLogId logId, 
Participants participants)
         {
             super(localHostId, logId, participants);
+            unreconciledMutations = new UnreconciledMutationsReplica();
+        }
+
+        @Override
+        UnreconciledMutationsReplica unreconciledMutations()
+        {
+            return unreconciledMutations;
         }
 
         MutationId nextId()
@@ -235,9 +283,18 @@ public abstract class CoordinatorLog
 
     public static class CoordinatorLogReplica extends CoordinatorLog
     {
+        private final UnreconciledMutationsReplica unreconciledMutations;
+
         CoordinatorLogReplica(int localHostId, CoordinatorLogId logId, 
Participants participants)
         {
             super(localHostId, logId, participants);
+            this.unreconciledMutations = new UnreconciledMutationsReplica();
+        }
+
+        @Override
+        UnreconciledMutationsReplica unreconciledMutations()
+        {
+            return unreconciledMutations;
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
index 37d4f25851..f7dd55cce7 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
@@ -118,7 +118,7 @@ public class CoordinatorLogId implements Serializable
 
     public static final Comparator<CoordinatorLogId> comparator = (l, r) -> 
Long.compareUnsigned(l.asLong(), r.asLong());
 
-    public static final IVersionedSerializer<CoordinatorLogId> serializer = 
new IVersionedSerializer<>()
+    static final class Serializer implements 
IVersionedSerializer<CoordinatorLogId>
     {
         @Override
         public void serialize(CoordinatorLogId logId, DataOutputPlus out, int 
version) throws IOException
@@ -127,6 +127,12 @@ public class CoordinatorLogId implements Serializable
             out.writeInt(logId.hostLogId);
         }
 
+        public void serialize(long logId, DataOutputPlus out, int version) 
throws IOException
+        {
+            out.writeInt(hostId(logId));
+            out.writeInt(hostLogId(logId));
+        }
+
         @Override
         public CoordinatorLogId deserialize(DataInputPlus in, int version) 
throws IOException
         {
@@ -142,5 +148,12 @@ public class CoordinatorLogId implements Serializable
         {
             return TypeSizes.sizeof(logId.hostId) + 
TypeSizes.sizeof(logId.hostLogId);
         }
-    };
+
+        public long serializedSize(long logId, int version)
+        {
+            return TypeSizes.sizeof(logId);
+        }
+    }
+
+    static final Serializer serializer = new Serializer();
 }
diff --git a/src/java/org/apache/cassandra/replication/ForwardedWrite.java 
b/src/java/org/apache/cassandra/replication/ForwardedWrite.java
index 51a072a423..41153e307f 100644
--- a/src/java/org/apache/cassandra/replication/ForwardedWrite.java
+++ b/src/java/org/apache/cassandra/replication/ForwardedWrite.java
@@ -402,7 +402,7 @@ public class ForwardedWrite
         {
             // Local mutations are witnessed from Keyspace.applyInternalTracked
             if (msg != null)
-                
MutationTrackingService.instance.witnessedRemoteMutation(keyspace, token, id, 
msg.from());
+                
MutationTrackingService.instance.receivedWriteResponse(keyspace, token, id, 
msg.from());
 
             // Local write needs to be ack'd to coordinator
             if (msg == null && ackTo != null)
diff --git a/src/java/org/apache/cassandra/replication/MutationSummary.java 
b/src/java/org/apache/cassandra/replication/MutationSummary.java
index 53a18d1046..c786b295fe 100644
--- a/src/java/org/apache/cassandra/replication/MutationSummary.java
+++ b/src/java/org/apache/cassandra/replication/MutationSummary.java
@@ -255,6 +255,14 @@ public class MutationSummary
         return count;
     }
 
+    public int reconciledIds()
+    {
+        int count = 0;
+        for (CoordinatorSummary summary : summaries)
+            count += summary.reconciled.offsetCount();
+        return count;
+    }
+
     public int size()
     {
         return summaries.size();
diff --git 
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java 
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 3e48840531..efff030e7b 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -18,30 +18,44 @@
 package org.apache.cassandra.replication;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.IntSupplier;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import org.agrona.collections.IntArrayList;
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
+import org.apache.cassandra.concurrent.Shutdownable;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.reads.tracked.TrackedLocalReads;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static 
org.apache.cassandra.concurrent.ExecutorFactory.SimulatorSemantics.NORMAL;
+
 // TODO (expected): persistence (handle restarts)
 // TODO (expected): handle topology changes
 public class MutationTrackingService
@@ -50,6 +64,7 @@ public class MutationTrackingService
     public static final MutationTrackingService instance = new 
MutationTrackingService();
 
     private final TrackedLocalReads localReads = new TrackedLocalReads();
+    private final ReplicatedOffsetsBroadcaster broadcaster = new 
ReplicatedOffsetsBroadcaster();
     private final ConcurrentHashMap<String, KeyspaceShards> shards = new 
ConcurrentHashMap<>();
 
     private volatile boolean started = false;
@@ -67,6 +82,9 @@ public class MutationTrackingService
         for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
             if (keyspace.useMutationTracking())
                 shards.put(keyspace.name, KeyspaceShards.make(keyspace, 
metadata, this::nextHostLogId));
+
+        broadcaster.start();
+
         started = true;
     }
 
@@ -92,10 +110,15 @@ public class MutationTrackingService
         return id;
     }
 
-    public void witnessedRemoteMutation(String keyspace, Token token, 
MutationId mutationId, InetAddressAndPort onHost)
+    public void receivedWriteResponse(String keyspace, Token token, MutationId 
mutationId, InetAddressAndPort onHost)
     {
         Preconditions.checkArgument(!mutationId.isNone());
-        getOrCreate(keyspace).witnessedRemoteMutation(token, mutationId, 
onHost);
+        getOrCreate(keyspace).receivedWriteResponse(token, mutationId, onHost);
+    }
+
+    public void updateReplicatedOffsets(String keyspace, Range<Token> range, 
List<? extends Offsets> offsets, InetAddressAndPort onHost)
+    {
+        getOrCreate(keyspace).updateReplicatedOffsets(range, offsets, onHost);
     }
 
     public void startWriting(Mutation mutation)
@@ -125,6 +148,12 @@ public class MutationTrackingService
         return createSummaryForRange(Range.makeRowRange(range), tableId, 
includePending);
     }
 
+    void forEachKeyspace(Consumer<KeyspaceShards> consumer)
+    {
+        for (KeyspaceShards keyspaceShards : shards.values())
+            consumer.accept(keyspaceShards);
+    }
+
     private KeyspaceShards getOrCreate(TableId tableId)
     {
         //noinspection DataFlowIssue
@@ -161,11 +190,13 @@ public class MutationTrackingService
             
Preconditions.checkArgument(keyspace.params.replicationType.isTracked());
             Map<Range<Token>, Shard> shards = new HashMap<>();
             
cluster.placements.get(keyspace.params.replication).writes.forEach((tokenRange, 
forRange) -> {
-               IntArrayList participants = new IntArrayList(forRange.size(), 
IntArrayList.DEFAULT_NULL_VALUE);
-               for (InetAddressAndPort endpoint : forRange.endpoints())
-                   participants.add(cluster.directory.peerId(endpoint).id());
-               Shard shard = new Shard(keyspace.name, tokenRange, 
cluster.myNodeId().id(), new Participants(participants), 
forRange.lastModified(), logIdProvider);
-               shards.put(tokenRange, shard);
+                if 
(!forRange.endpoints().contains(FBUtilities.getBroadcastAddressAndPort()))
+                    return;
+                IntArrayList participants = new IntArrayList(forRange.size(), 
IntArrayList.DEFAULT_NULL_VALUE);
+                for (InetAddressAndPort endpoint : forRange.endpoints())
+                    participants.add(cluster.directory.peerId(endpoint).id());
+                Shard shard = new Shard(keyspace.name, tokenRange, 
cluster.myNodeId().id(), new Participants(participants), 
forRange.lastModified(), logIdProvider);
+                shards.put(tokenRange, shard);
             });
             return new KeyspaceShards(keyspace.name, shards);
         }
@@ -184,9 +215,14 @@ public class MutationTrackingService
             return lookUp(token).nextId();
         }
 
-        void witnessedRemoteMutation(Token token, MutationId mutationId, 
InetAddressAndPort onHost)
+        void receivedWriteResponse(Token token, MutationId mutationId, 
InetAddressAndPort onHost)
+        {
+            lookUp(token).receivedWriteResponse(mutationId, onHost);
+        }
+
+        void updateReplicatedOffsets(Range<Token> range, List<? extends 
Offsets> offsets, InetAddressAndPort onHost)
         {
-            lookUp(token).witnessedRemoteMutation(mutationId, onHost);
+            shards.get(range).updateReplicatedOffsets(offsets, onHost);
         }
 
         void startWriting(Mutation mutation)
@@ -224,6 +260,12 @@ public class MutationTrackingService
             });
         }
 
+        void forEachShard(Consumer<Shard> consumer)
+        {
+            for (Shard shard : shards.values())
+                consumer.accept(shard);
+        }
+
         Shard lookUp(Mutation mutation)
         {
             return lookUp(mutation.key());
@@ -242,4 +284,73 @@ public class MutationTrackingService
             return shards.get(range);
         }
     }
+
+    // TODO (later): a more intelligent heuristic for offsets included in 
broadcasts
+    private static class ReplicatedOffsetsBroadcaster implements Runnable, 
Shutdownable
+    {
+        private static final ScheduledExecutorPlus executor =
+            executorFactory().scheduled("Replicated-Offsets-Broadcaster", 
NORMAL);
+
+        // TODO (later): a more intelligent heuristic for scheduling broadcasts
+        private static final long BROADCAST_INTERVAL_MILLIS = 200;
+
+        void start()
+        {
+            executor.scheduleWithFixedDelay(this, BROADCAST_INTERVAL_MILLIS, 
BROADCAST_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+        }
+
+        @Override
+        public boolean isTerminated()
+        {
+            return executor.isTerminated();
+        }
+
+        @Override
+        public void shutdown()
+        {
+            executor.shutdown();
+        }
+
+        @Override
+        public Object shutdownNow()
+        {
+            return executor.shutdownNow();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException
+        {
+            return executor.awaitTermination(timeout, units);
+        }
+
+        @Override
+        public void run()
+        {
+            MutationTrackingService.instance.forEachKeyspace(this::run);
+        }
+
+        private void run(KeyspaceShards shards)
+        {
+            shards.forEachShard(this::run);
+        }
+
+        private void run(Shard shard)
+        {
+            ShardReplicatedOffsets replicatedOffsets = 
shard.collectReplicatedOffsets();
+            if (replicatedOffsets.isEmpty())
+                return;
+
+            Message<ShardReplicatedOffsets> message = 
Message.out(Verb.BROADCAST_LOG_OFFSETS, replicatedOffsets);
+
+            for (InetAddressAndPort target : shard.remoteReplicas())
+                if (FailureDetector.instance.isAlive(target))
+                    MessagingService.instance().send(message, target);
+        }
+    }
+
+    @VisibleForTesting
+    public void broadcastOffsetsForTesting()
+    {
+        broadcaster.run();
+    }
 }
diff --git a/src/java/org/apache/cassandra/replication/Offsets.java 
b/src/java/org/apache/cassandra/replication/Offsets.java
index df1512617a..28e2d978f7 100644
--- a/src/java/org/apache/cassandra/replication/Offsets.java
+++ b/src/java/org/apache/cassandra/replication/Offsets.java
@@ -351,6 +351,7 @@ public abstract class Offsets implements 
Iterable<ShortMutationId>
             if (size == 0)
             {
                 append(start, end);
+                onAdded.consume(logId, start, end);
                 return true;
             }
 
@@ -529,24 +530,6 @@ public abstract class Offsets implements 
Iterable<ShortMutationId>
             bounds[size++] = start;
             bounds[size++] = end;
         }
-
-        public void append(int offset)
-        {
-            if (size == 0)
-            {
-                append(offset, offset);
-                return;
-            }
-
-            int tail = bounds[size - 1];
-            if (offset <= tail)
-                throw new IllegalArgumentException("Can't append " + offset + 
" to " + tail);
-
-            if (offset == tail + 1)
-                bounds[size-1] = offset;
-            else
-                append(offset, offset);
-        }
     }
 
     public static class Mutable extends AbstractMutable<Mutable>
@@ -1014,7 +997,7 @@ public abstract class Offsets implements 
Iterable<ShortMutationId>
                 case VALID:
                     state = computeNext();
                     if (!state.isFinished())
-                        break;;
+                        break;
                 case FINISHED:
                     return false;
                 default:
diff --git a/src/java/org/apache/cassandra/replication/Participants.java 
b/src/java/org/apache/cassandra/replication/Participants.java
index c665d63a99..494dd3adca 100644
--- a/src/java/org/apache/cassandra/replication/Participants.java
+++ b/src/java/org/apache/cassandra/replication/Participants.java
@@ -46,6 +46,11 @@ public class Participants
         return idx;
     }
 
+    boolean contains(int hostId)
+    {
+        return indexOf(hostId) >= 0;
+    }
+
     int get(int idx)
     {
         if (idx < 0 || idx >= hosts.length)
diff --git a/src/java/org/apache/cassandra/replication/Shard.java 
b/src/java/org/apache/cassandra/replication/Shard.java
index 72ff1119ca..1a8ca357a6 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.replication;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.function.IntSupplier;
 
 import com.google.common.base.Preconditions;
@@ -30,6 +32,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.replication.CoordinatorLog.CoordinatorLogPrimary;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.membership.NodeId;
 import org.jctools.maps.NonBlockingHashMapLong;
 
 public class Shard
@@ -45,6 +48,8 @@ public class Shard
 
     Shard(String keyspace, Range<Token> tokenRange, int localHostId, 
Participants participants, Epoch sinceEpoch, IntSupplier logIdProvider)
     {
+        Preconditions.checkArgument(participants.contains(localHostId));
+
         this.keyspace = keyspace;
         this.tokenRange = tokenRange;
         this.localHostId = localHostId;
@@ -62,20 +67,27 @@ public class Shard
         return currentLocalLog.nextId();
     }
 
-    void witnessedRemoteMutation(MutationId mutationId, InetAddressAndPort 
onHost)
+    void receivedWriteResponse(MutationId mutationId, InetAddressAndPort 
onHost)
+    {
+        int onHostId = ClusterMetadata.current().directory.peerId(onHost).id();
+        getOrCreate(mutationId).receivedWriteResponse(mutationId, onHostId);
+    }
+
+    void updateReplicatedOffsets(List<? extends Offsets> offsets, 
InetAddressAndPort onHost)
     {
         int onHostId = ClusterMetadata.current().directory.peerId(onHost).id();
-        get(mutationId).witnessedRemoteMutation(mutationId, onHostId);
+        for (Offsets logOffsets : offsets)
+            
getOrCreate(logOffsets.logId()).updateReplicatedOffsets(logOffsets, onHostId);
     }
 
     void startWriting(Mutation mutation)
     {
-        get(mutation.id()).startWriting(mutation);
+        getOrCreate(mutation.id()).startWriting(mutation);
     }
 
     void finishWriting(Mutation mutation)
     {
-        get(mutation.id()).finishWriting(mutation);
+        getOrCreate(mutation.id()).finishWriting(mutation);
     }
 
     void addSummaryForKey(Token token, boolean includePending, 
MutationSummary.Builder builder)
@@ -94,6 +106,34 @@ public class Shard
         });
     }
 
+    List<InetAddressAndPort> remoteReplicas()
+    {
+        List<InetAddressAndPort> replicas = new 
ArrayList<>(participants.size() - 1);
+        for (int i = 0, size = participants.size(); i < size; ++i)
+        {
+            int hostId = participants.get(i);
+            if (hostId != localHostId)
+                replicas.add(ClusterMetadata.current().directory.endpoint(new 
NodeId(hostId)));
+        }
+        return replicas;
+    }
+
+    /**
+     * Collects replicated offsets for the logs owned by this coordinator on 
this shard.
+     */
+    ShardReplicatedOffsets collectReplicatedOffsets()
+    {
+        List<Offsets.Immutable> offsets = new ArrayList<>();
+        for (CoordinatorLog log : logs.values())
+        {
+            Offsets.Immutable logOffsets = log.collectReplicatedOffsets();
+            if (logOffsets != null)
+                offsets.add(logOffsets);
+        }
+
+        return new ShardReplicatedOffsets(keyspace, tokenRange, offsets);
+    }
+
     /**
      * Creates a new coordinator log for this host. Primarily on Shard init 
(node startup or topology change).
      * Also on keyspace creation.
@@ -104,13 +144,18 @@ public class Shard
         return new CoordinatorLog.CoordinatorLogPrimary(localHostId, logId, 
participants);
     }
 
-    private CoordinatorLog get(MutationId mutationId)
+    private CoordinatorLog getOrCreate(MutationId mutationId)
     {
         Preconditions.checkArgument(!mutationId.isNone());
-        return get(mutationId.logId());
+        return getOrCreate(mutationId.logId());
+    }
+
+    private CoordinatorLog getOrCreate(CoordinatorLogId logId)
+    {
+        return getOrCreate(logId.asLong());
     }
 
-    private CoordinatorLog get(long logId)
+    private CoordinatorLog getOrCreate(long logId)
     {
         CoordinatorLog log = logs.get(logId);
         return log != null
diff --git 
a/src/java/org/apache/cassandra/replication/ShardReplicatedOffsets.java 
b/src/java/org/apache/cassandra/replication/ShardReplicatedOffsets.java
new file mode 100644
index 0000000000..267b63c9d2
--- /dev/null
+++ b/src/java/org/apache/cassandra/replication/ShardReplicatedOffsets.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.IVerbHandler;
+
+public class ShardReplicatedOffsets
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(ShardReplicatedOffsets.class);
+
+    private final String keyspace;
+    private final Range<Token> range;
+    private final List<Offsets.Immutable> replicatedOffsets;
+
+    public ShardReplicatedOffsets(String keyspace, Range<Token> range, 
List<Offsets.Immutable> offsets)
+    {
+        this.keyspace = keyspace;
+        this.range = range;
+        this.replicatedOffsets = offsets;
+    }
+
+    boolean isEmpty()
+    {
+        return replicatedOffsets.isEmpty();
+    }
+
+    @Override
+    public String toString()
+    {
+        return "ShardReplicatedOffsets{" + keyspace + ", " + range + ", " + 
replicatedOffsets + '}';
+    }
+
+    public static final IVerbHandler<ShardReplicatedOffsets> verbHandler = 
message -> {
+        ShardReplicatedOffsets replicatedOffsets = message.payload;
+        logger.trace("Received replicated offsets {} from {}", 
replicatedOffsets, message.from());
+        
MutationTrackingService.instance.updateReplicatedOffsets(replicatedOffsets.keyspace,
+                                                                 
replicatedOffsets.range,
+                                                                 
replicatedOffsets.replicatedOffsets,
+                                                                 
message.from());
+    };
+
+    public static final IVersionedSerializer<ShardReplicatedOffsets> 
serializer = new IVersionedSerializer<>()
+    {
+        @Override
+        public void serialize(ShardReplicatedOffsets status, DataOutputPlus 
out, int version) throws IOException
+        {
+            out.writeUTF(status.keyspace);
+            AbstractBounds.tokenSerializer.serialize(status.range, out, 
version);
+            out.writeInt(status.replicatedOffsets.size());
+            for (Offsets.Immutable logOffsets : status.replicatedOffsets)
+                Offsets.serializer.serialize(logOffsets, out, version);
+        }
+
+        @Override
+        public ShardReplicatedOffsets deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            String keyspace = in.readUTF();
+            Range<Token> range = (Range<Token>) 
AbstractBounds.tokenSerializer.deserialize(in, IPartitioner.global(), version);
+            int count = in.readInt();
+            List<Offsets.Immutable> replicatedOffsets = new ArrayList<>(count);
+            for (int i = 0; i < count; ++i)
+                replicatedOffsets.add(Offsets.serializer.deserialize(in, 
version));
+            return new ShardReplicatedOffsets(keyspace, range, 
replicatedOffsets);
+        }
+
+        @Override
+        public long serializedSize(ShardReplicatedOffsets replicatedOffsets, 
int version)
+        {
+            long size = 0;
+            size += TypeSizes.sizeof(replicatedOffsets.keyspace);
+            size += 
AbstractBounds.tokenSerializer.serializedSize(replicatedOffsets.range, version);
+            size += 
TypeSizes.sizeof(replicatedOffsets.replicatedOffsets.size());
+            for (Offsets.Immutable logOffsets : 
replicatedOffsets.replicatedOffsets)
+                size += Offsets.serializer.serializedSize(logOffsets, version);
+            return size;
+        }
+    };
+}
diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java 
b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
index 7ae7b35e77..ba58f4f1f7 100644
--- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
+++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
@@ -168,10 +168,10 @@ public class TrackedWriteRequest
                 if (remoteDCReplicas == null)
                     remoteDCReplicas = new HashMap<>();
 
-                List<Replica> messages = remoteDCReplicas.get(dc);
-                if (messages == null)
-                    messages = remoteDCReplicas.computeIfAbsent(dc, ignore -> 
new ArrayList<>(3)); // most DCs will have <= 3 replicas
-                messages.add(destination);
+                List<Replica> replicas = remoteDCReplicas.get(dc);
+                if (replicas == null)
+                    replicas = remoteDCReplicas.computeIfAbsent(dc, ignore -> 
new ArrayList<>(3)); // most DCs will have <= 3 replicas
+                replicas.add(destination);
             }
         }
 
@@ -179,8 +179,8 @@ public class TrackedWriteRequest
         applyMutationLocally(mutation, handler);
 
         if (localDCReplicas != null)
-            for (Replica destination : localDCReplicas)
-                MessagingService.instance().sendWriteWithCallback(message, 
destination, handler);
+            for (Replica replica : localDCReplicas)
+                MessagingService.instance().sendWriteWithCallback(message, 
replica, handler);
 
         if (remoteDCReplicas != null)
         {
diff --git a/src/java/org/apache/cassandra/replication/Participants.java 
b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
similarity index 51%
copy from src/java/org/apache/cassandra/replication/Participants.java
copy to src/java/org/apache/cassandra/replication/UnreconciledMutations.java
index c665d63a99..a84bdc5162 100644
--- a/src/java/org/apache/cassandra/replication/Participants.java
+++ b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
@@ -17,39 +17,25 @@
  */
 package org.apache.cassandra.replication;
 
-import java.util.Arrays;
-import java.util.Collection;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.TableId;
 
-public class Participants
+/**
+ * Tracks unreconciled local mutations - the subset of all unreconciled 
mutations
+ * that have been witnessed, or are currently being written to, on the local 
node.
+ */
+interface UnreconciledMutations
 {
-    private final int[] hosts;
+    void startWriting(Mutation mutation);
 
-    Participants(Collection<Integer> participants)
-    {
-        int i = 0;
-        int[] hosts = new int[participants.size()];
-        for (int host : participants) hosts[i++] = host;
-        Arrays.sort(hosts);
-        this.hosts = hosts;
-    }
+    void finishWriting(Mutation mutation);
 
-    int size()
-    {
-        return hosts.length;
-    }
+    void remove(int offset);
 
-    int indexOf(int hostId)
-    {
-        int idx = Arrays.binarySearch(hosts, hostId);
-        if (idx < 0)
-            throw new IllegalArgumentException("Unknown host id " + hostId);
-        return idx;
-    }
+    boolean collect(Token token, TableId tableId, boolean includePending, 
Offsets.OffsetReciever into);
 
-    int get(int idx)
-    {
-        if (idx < 0 || idx >= hosts.length)
-            throw new IllegalArgumentException("Out of bounds host idx " + 
idx);
-        return hosts[idx];
-    }
+    boolean collect(AbstractBounds<PartitionPosition> range, TableId tableId, 
boolean includePending, Offsets.OffsetReciever into);
 }
diff --git a/src/java/org/apache/cassandra/replication/LocalMutationStates.java 
b/src/java/org/apache/cassandra/replication/UnreconciledMutationsReplica.java
similarity index 92%
rename from src/java/org/apache/cassandra/replication/LocalMutationStates.java
rename to 
src/java/org/apache/cassandra/replication/UnreconciledMutationsReplica.java
index ce1f13dd98..821c8db0fc 100644
--- a/src/java/org/apache/cassandra/replication/LocalMutationStates.java
+++ 
b/src/java/org/apache/cassandra/replication/UnreconciledMutationsReplica.java
@@ -37,7 +37,7 @@ import org.apache.cassandra.schema.TableId;
  * Tracks unreconciled local mutations - the subset of all unreconciled 
mutations
  * that have been witnessed, or are currently being written to, on the local 
node.
  */
-class LocalMutationStates
+class UnreconciledMutationsReplica implements UnreconciledMutations
 {
     private final Int2ObjectHashMap<Entry> statesMap = new 
Int2ObjectHashMap<>();
     private final SortedSet<Entry> statesSet = new TreeSet<>(Entry.comparator);
@@ -115,14 +115,16 @@ class LocalMutationStates
         }
     }
 
-    void startWriting(Mutation mutation)
+    @Override
+    public void startWriting(Mutation mutation)
     {
         Entry entry = Entry.create(mutation);
         statesMap.put(entry.offset, entry);
         statesSet.add(entry);
     }
 
-    void finishWriting(Mutation mutation)
+    @Override
+    public void finishWriting(Mutation mutation)
     {
         Preconditions.checkArgument(!mutation.id().isNone());
         Entry entry = statesMap.get(mutation.id().offset());
@@ -130,20 +132,23 @@ class LocalMutationStates
         entry.visibility = Visibility.VISIBLE;
     }
 
-    void remove(int offset)
+    @Override
+    public void remove(int offset)
     {
         Entry state = statesMap.remove(offset);
         Preconditions.checkNotNull(state);
         statesSet.remove(state);
     }
 
-    boolean collect(Token token, TableId tableId, boolean includePending, 
Offsets.OffsetReciever into)
+    @Override
+    public boolean collect(Token token, TableId tableId, boolean 
includePending, Offsets.OffsetReciever into)
     {
         SortedSet<Entry> subset = statesSet.subSet(Entry.start(token, true), 
Entry.end(token, true));
         return collect(subset, tableId, includePending, into);
     }
 
-    boolean collect(AbstractBounds<PartitionPosition> range, TableId tableId, 
boolean includePending, Offsets.OffsetReciever into)
+    @Override
+    public boolean collect(AbstractBounds<PartitionPosition> range, TableId 
tableId, boolean includePending, Offsets.OffsetReciever into)
     {
         Entry start = Entry.start(range.left.getToken(), range.left.kind() != 
PartitionPosition.Kind.MAX_BOUND);
         Entry end = Entry.end(range.right.getToken(), range.right.kind() != 
PartitionPosition.Kind.MIN_BOUND);
diff --git 
a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
index 56528f89c8..c3eb76a46d 100644
--- a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
@@ -56,7 +56,7 @@ public class TrackedWriteResponseHandler extends 
AbstractWriteResponseHandler<No
     {
         // Local mutations are witnessed from Keyspace.applyInternalTracked
         if (msg != null)
-            MutationTrackingService.instance.witnessedRemoteMutation(keyspace, 
token, mutationId, msg.from());
+            MutationTrackingService.instance.receivedWriteResponse(keyspace, 
token, mutationId, msg.from());
         wrapped.onResponse(msg);
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java
index 3f0e3ca3a5..6a72a7a01a 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileNotify.java
@@ -68,7 +68,7 @@ public class ReadReconcileNotify
         }
     };
 
-    public static final IVersionedSerializer<ReadReconcileNotify> serializer = 
new IVersionedSerializer<ReadReconcileNotify>()
+    public static final IVersionedSerializer<ReadReconcileNotify> serializer = 
new IVersionedSerializer<>()
     {
         @Override
         public void serialize(ReadReconcileNotify notify, DataOutputPlus out, 
int version) throws IOException
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
new file mode 100644
index 0000000000..5cd9c8cb2c
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.tracking;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.replication.MutationSummary;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static 
org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.getOnlyLogId;
+
+public class OffsetBroadcastTest extends TestBaseImpl
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(OffsetBroadcastTest.class);
+
+    @Test
+    public void testBroadcastOffsets() throws Throwable
+    {
+        try (Cluster cluster = Cluster.build(3)
+                                      .withConfig(cfg -> 
cfg.with(Feature.NETWORK)
+                                                            
.with(Feature.GOSSIP)
+                                                            
.set("mutation_tracking_enabled", "true"))
+                                      .start())
+        {
+
+            cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH 
replication = " +
+                                              "{'class': 'SimpleStrategy', 
'replication_factor': 3} " +
+                                              "AND 
replication_type='tracked';"));
+
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int 
primary key, v int);"));
+
+            String keyspaceName = KEYSPACE;
+
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl 
(k, v) VALUES (1, 1)"), ConsistencyLevel.QUORUM);
+
+            for (int i = 1; i <= cluster.size(); ++i)
+                cluster.get(i).runOnInstance(() -> 
MutationTrackingService.instance.broadcastOffsetsForTesting());
+
+            for (int i = 1; i <= cluster.size(); ++i)
+            {
+                cluster.get(i).runOnInstance(() -> {
+                    TableMetadata table = 
Schema.instance.getTableMetadata(keyspaceName, "tbl");
+                    DecoratedKey dk = 
Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1));
+                    MutationSummary summary = 
MutationTrackingService.instance.createSummaryForKey(dk, table.id, false);
+                    MutationSummary.CoordinatorSummary coordinatorSummary = 
summary.get(getOnlyLogId(summary));
+                    Assert.assertEquals(1, 
coordinatorSummary.reconciled.offsetCount());
+                    Assert.assertEquals(0, 
coordinatorSummary.unreconciled.offsetCount());
+                });
+            }
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java 
b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
index 37bc1cd080..f89f1a521b 100644
--- a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
+++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java
@@ -68,7 +68,7 @@ public class CoordinatorLogTest
     {
         Offsets.Mutable list = new Offsets.Mutable(LOG_ID);
         for (MutationId id : ids)
-            list.append(id.offset());
+            list.add(id.offset());
         return list;
     }
 
@@ -119,10 +119,10 @@ public class CoordinatorLogTest
         // the call to finishWriting will have made the ids visible without 
the includePending flag
         assertUnreconciled(tk, tableId, log, false, reconciled, ids);
 
-        log.witnessedRemoteMutation(ids[0], PARTICIPANTS.get(1));
+        log.receivedWriteResponse(ids[0], PARTICIPANTS.get(1));
         assertUnreconciled(tk, tableId, log, false, reconciled, ids);
 
-        log.witnessedRemoteMutation(ids[0], PARTICIPANTS.get(2));
+        log.receivedWriteResponse(ids[0], PARTICIPANTS.get(2));
         reconciled.add(ids[0].offset());
         assertUnreconciled(tk, tableId, log, false, reconciled, ids[1], 
ids[2]);
     }
diff --git a/test/unit/org/apache/cassandra/replication/OffsetsTest.java 
b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
index 539da7d1b0..1d114a62f2 100644
--- a/test/unit/org/apache/cassandra/replication/OffsetsTest.java
+++ b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
@@ -510,22 +510,22 @@ public class OffsetsTest
     public void appendTest()
     {
         Offsets.Mutable ids = new Offsets.Mutable(LOG_ID);
-        ids.append(5);
+        ids.add(5);
         assertEquals(1, ids.rangeCount());
         assertEquals(1, ids.offsetCount());
 
-        ids.append(6);
+        ids.add(6);
         assertEquals(1, ids.rangeCount());
         assertEquals(2, ids.offsetCount());
 
-        ids.append(8);
+        ids.add(8);
         assertEquals(2, ids.rangeCount());
         assertEquals(3, ids.offsetCount());
 
         // insert before tail
         try
         {
-            ids.append(8);
+            ids.add(8);
             Assert.fail();
         }
         catch (IllegalArgumentException e)
@@ -538,7 +538,7 @@ public class OffsetsTest
         // insert before tail
         try
         {
-            ids.append(7);
+            ids.add(7);
             Assert.fail();
         }
         catch (IllegalArgumentException e)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to