This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cep-45-mutation-tracking in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 96d850aa9d90787aa8fc6668d9a826beb5d94e2d Author: Abe Ratnofsky <a...@aber.io> AuthorDate: Tue Mar 25 12:41:57 2025 -0400 CEP-45: Query forwarding Patch by Abe Ratnofsky; Reviewed by Blake Eggleston for CASSANDRA-20309 --- CHANGES.txt | 1 + .../apache/cassandra/db/MutationVerbHandler.java | 20 +- src/java/org/apache/cassandra/net/Message.java | 7 + .../org/apache/cassandra/net/MessagingService.java | 10 + src/java/org/apache/cassandra/net/ParamType.java | 5 +- .../org/apache/cassandra/net/RequestCallbacks.java | 6 +- src/java/org/apache/cassandra/net/Verb.java | 2 + .../cassandra/replication/CoordinatorLog.java | 3 + .../cassandra/replication/CoordinatorLogId.java | 19 + .../cassandra/replication/ForwardedWrite.java | 479 +++++++++++++++++++++ .../cassandra/replication/LocalMutationStates.java | 1 + .../apache/cassandra/replication/MutationId.java | 2 +- .../cassandra/replication/MutationSummary.java | 1 + .../replication/MutationTrackingService.java | 6 + .../org/apache/cassandra/replication/Shard.java | 4 +- .../cassandra/replication/TrackedWriteRequest.java | 86 ++-- .../service/TrackedWriteResponseHandler.java | 3 +- .../apache/cassandra/tcm/membership/NodeId.java | 22 + .../MutationTrackingWriteForwardingTest.java | 117 +++++ 19 files changed, 756 insertions(+), 38 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index fb9f8fbeed..0bc461129a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ cep-45-mutation-tracking + * CEP-45: Query forwarding (CASSANDRA-20309) * Fix mutation tracking startup (CASSANDRA-20540) * Mutation tracking journal integration, read, and write path (CASSANDRA-20304, CASSANDRA-20305, CASSANDRA-20308) * Introduce MutationJournal for coordinator logs (CASSANDRA-20353) diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java index c30fae63b4..96e3fa7e7e 100644 --- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java @@ -17,9 +17,13 @@ */ package org.apache.cassandra.db; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.*; +import org.apache.cassandra.replication.ForwardedWrite; import org.apache.cassandra.tracing.Tracing; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -28,12 +32,24 @@ import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; public class MutationVerbHandler extends AbstractMutationVerbHandler<Mutation> { + private static final Logger logger = LoggerFactory.getLogger(MutationVerbHandler.class); + public static final MutationVerbHandler instance = new MutationVerbHandler(); - private void respond(Message<?> respondTo, InetAddressAndPort respondToAddress) + private void respond(Message<?> incoming, InetAddressAndPort respondToAddress) { + // Local tracked writes respond in TrackedWriteResponseHandler + Message<NoPayload> response = incoming.emptyResponse(); Tracing.trace("Enqueuing response to {}", respondToAddress); - MessagingService.instance().send(respondTo.emptyResponse(), respondToAddress); + logger.trace("Enqueuing response to {}", respondToAddress); + MessagingService.instance().send(response, respondToAddress); + + ForwardedWrite.CoordinatorAckInfo ackTo = (ForwardedWrite.CoordinatorAckInfo) incoming.header.params().get(ParamType.COORDINATOR_ACK_INFO); + if (ackTo != null) + { + logger.trace("Enqueuing response for direct acknowledgement of forwarded tracked mutation to coordinator {}", ackTo.coordinator); + MessagingService.instance().send(response, ackTo.coordinator); + } } private void failed() diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java index b6a5a849e5..fe041b816f 100644 --- a/src/java/org/apache/cassandra/net/Message.java +++ b/src/java/org/apache/cassandra/net/Message.java @@ -731,6 +731,13 @@ public class Message<T> return this; } + public Builder<T> withRequestTime(Dispatcher.RequestTime requestTime) + { + this.createdAtNanos = requestTime.startedAtNanos(); + this.expiresAtNanos = requestTime.computeDeadline(verb.expiresAfterNanos()); + return this; + } + public Message<T> build() { if (verb == null) diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 97a019642a..7ddada9e0a 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +44,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.metrics.MessagingMetrics; +import org.apache.cassandra.replication.ForwardedWrite; import org.apache.cassandra.service.AbstractWriteResponseHandler; import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; @@ -449,6 +451,14 @@ public class MessagingService extends MessagingServiceMBeanImpl implements Messa send(message, to.endpoint(), null); } + public void sendForwardedWriteWithCallback(Message message, Replica to, ForwardedWrite.LeaderCallback handler) + { + Preconditions.checkArgument(message.verb() == Verb.MUTATION_REQ); + Preconditions.checkArgument(message.callBackOnFailure()); + callbacks.addWithExpiration(handler, message, to.endpoint()); + send(message, to.endpoint(), null); + } + /** * Send a message to a given endpoint. This method adheres to the fire and forget * style messaging. diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java index 2367b1a390..e01a3ac869 100644 --- a/src/java/org/apache/cassandra/net/ParamType.java +++ b/src/java/org/apache/cassandra/net/ParamType.java @@ -20,6 +20,7 @@ package org.apache.cassandra.net; import javax.annotation.Nullable; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.replication.ForwardedWrite; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.Int32Serializer; import org.apache.cassandra.utils.Int64Serializer; @@ -54,7 +55,9 @@ public enum ParamType ROW_INDEX_READ_SIZE_WARN (13, Int64Serializer.serializer), CUSTOM_MAP (14, CustomParamsSerializer.serializer), TOO_MANY_REFERENCED_INDEXES_WARN (16, Int32Serializer.serializer), - TOO_MANY_REFERENCED_INDEXES_FAIL (17, Int32Serializer.serializer); + TOO_MANY_REFERENCED_INDEXES_FAIL (17, Int32Serializer.serializer), + // Different from RESPOND_TO because it's an additional recipient of the acknowledgement + COORDINATOR_ACK_INFO (18, ForwardedWrite.CoordinatorAckInfo.serializer); final int id; final IVersionedSerializer serializer; diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java b/src/java/org/apache/cassandra/net/RequestCallbacks.java index ee63c5a3e6..4a46b73b5a 100644 --- a/src/java/org/apache/cassandra/net/RequestCallbacks.java +++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.slf4j.Logger; @@ -35,6 +36,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.metrics.InternodeOutboundMetrics; +import org.apache.cassandra.replication.ForwardedWrite; import org.apache.cassandra.service.AbstractWriteResponseHandler; import static java.lang.String.format; @@ -96,14 +98,14 @@ public class RequestCallbacks implements OutboundMessageCallbacks public void addWithExpiration(RequestCallback<?> cb, Message<?> message, InetAddressAndPort to) { // mutations need to call the overload - assert message.verb() != Verb.MUTATION_REQ && message.verb() != Verb.COUNTER_MUTATION_REQ; + Preconditions.checkArgument((message.verb() != Verb.MUTATION_REQ && message.verb() != Verb.COUNTER_MUTATION_REQ) || (cb instanceof ForwardedWrite.LeaderCallback)); CallbackInfo previous = callbacks.put(key(message.id(), to), new CallbackInfo(message, to, cb)); assert previous == null : format("Callback already exists for id %d/%s! (%s)", message.id(), to, previous); } public void addWithExpiration(AbstractWriteResponseHandler<?> cb, Message<?> message, Replica to) { - assert message.verb() == Verb.MUTATION_REQ || message.verb() == Verb.COUNTER_MUTATION_REQ || message.verb() == Verb.PAXOS_COMMIT_REQ; + Preconditions.checkArgument(message.verb() == Verb.MUTATION_REQ || message.verb() == Verb.COUNTER_MUTATION_REQ || message.verb() == Verb.PAXOS_COMMIT_REQ || message.verb() == Verb.FORWARDING_WRITE); CallbackInfo previous = callbacks.put(key(message.id(), to.endpoint()), new CallbackInfo(message, to.endpoint(), cb)); assert previous == null : format("Callback already exists for id %d/%s! (%s)", message.id(), to.endpoint(), previous); } diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index 7dcfc5b473..032270b456 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -72,6 +72,7 @@ import org.apache.cassandra.repair.messages.SyncResponse; import org.apache.cassandra.repair.messages.SyncRequest; import org.apache.cassandra.repair.messages.ValidationResponse; import org.apache.cassandra.repair.messages.ValidationRequest; +import org.apache.cassandra.replication.ForwardedWrite; import org.apache.cassandra.schema.SchemaMutationsSerializer; import org.apache.cassandra.schema.SchemaPullVerbHandler; import org.apache.cassandra.schema.SchemaPushVerbHandler; @@ -247,6 +248,7 @@ public enum Verb READ_RECONCILE_SEND (901, P0, rpcTimeout, READ, () -> ReadReconcileSend.serializer, () -> ReadReconcileSend.verbHandler ), READ_RECONCILE_RCV (902, P0, rpcTimeout, MUTATION, () -> ReadReconcileReceive.serializer, () -> ReadReconcileReceive.verbHandler ), READ_RECONCILE_NOTIFY (903, P0, rpcTimeout, REQUEST_RESPONSE, () -> ReadReconcileNotify.serializer, () -> ReadReconcileNotify.verbHandler ), + FORWARDING_WRITE (904, P3, writeTimeout, MUTATION, () -> ForwardedWrite.Request.serializer, () -> ForwardedWrite.verbHandler), TRACKED_PARTITION_READ_RSP (906, P2, readTimeout, REQUEST_RESPONSE, () -> TrackedDataResponse.serializer, () -> ResponseVerbHandler.instance ), TRACKED_PARTITION_READ_REQ (907, P3, readTimeout, READ, () -> TrackedRead.DataRequest.serializer, () -> TrackedRead.verbHandler, TRACKED_PARTITION_READ_RSP), diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index 0abd0db93e..74712fd968 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.base.Preconditions; + import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; @@ -72,6 +74,7 @@ public abstract class CoordinatorLog void witnessedRemoteMutation(MutationId mutationId, int onHostId) { + Preconditions.checkArgument(!mutationId.isNone()); logger.trace("witnessed remote mutation {} from {}", mutationId, onHostId); lock.writeLock().lock(); try diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java index 55c38485b0..37d4f25851 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java @@ -28,6 +28,8 @@ import java.util.Comparator; public class CoordinatorLogId implements Serializable { + private static final CoordinatorLogId NONE = new CoordinatorLogId(Integer.MIN_VALUE, Integer.MIN_VALUE); + /** TCM host ID */ protected final int hostId; @@ -79,6 +81,21 @@ public class CoordinatorLogId implements Serializable return (int) coordinatorLogId; } + public static CoordinatorLogId none() + { + return NONE; + } + + static boolean isNone(int hostId, int hostLogId) + { + return hostId == NONE.hostId && hostLogId == NONE.hostLogId; + } + + public boolean isNone() + { + return this == NONE || isNone(hostId, hostLogId); + } + @Override public String toString() { @@ -115,6 +132,8 @@ public class CoordinatorLogId implements Serializable { int hostId = in.readInt(); int hostLogId = in.readInt(); + if (isNone(hostId, hostLogId)) + return none(); return new CoordinatorLogId(hostId, hostLogId); } diff --git a/src/java/org/apache/cassandra/replication/ForwardedWrite.java b/src/java/org/apache/cassandra/replication/ForwardedWrite.java new file mode 100644 index 0000000000..51a072a423 --- /dev/null +++ b/src/java/org/apache/cassandra/replication/ForwardedWrite.java @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import org.apache.cassandra.db.*; +import org.apache.cassandra.locator.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessageFlag; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.net.ParamType; +import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.service.AbstractWriteResponseHandler; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.net.Verb.MUTATION_REQ; + +/** + * For a forwarded write there are 2 nodes involved in coordination, a coordinator and a leader. The coordinator is the + * node that the client is communicating with, and the leader is the mutation replica that is handling the mutation + * tracking for that write. + */ +public class ForwardedWrite +{ + private static final Logger logger = LoggerFactory.getLogger(ForwardedWrite.class); + + public interface Request + { + enum Kind + { + MUTATION(0); + + private final byte id; + + Kind(int id) + { + this.id = (byte) id; + } + + IVersionedSerializer<Request> serializer() + { + switch (this) + { + case MUTATION: + return MutationRequest.serializer; + default: + throw new IllegalStateException("Unhandled kind " + this); + } + } + + static final IVersionedSerializer<Kind> serializer = new IVersionedSerializer<Request.Kind>() + { + @Override + public void serialize(Kind kind, DataOutputPlus out, int version) throws IOException + { + out.writeByte(kind.id); + + } + + @Override + public Kind deserialize(DataInputPlus in, int version) throws IOException + { + byte id = in.readByte(); + switch (id) + { + case 0: + return MUTATION; + default: + throw new IllegalStateException("Unknown kind: " + id); + } + } + + @Override + public long serializedSize(Kind kind, int version) + { + return TypeSizes.BYTE_SIZE; + } + }; + } + + Kind kind(); + DecoratedKey key(); + void applyLocallyAndForwardToReplicas(CoordinatorAckInfo ackTo); + + IVersionedSerializer<Request> serializer = new IVersionedSerializer<>() + { + @Override + public void serialize(Request request, DataOutputPlus out, int version) throws IOException + { + Kind.serializer.serialize(request.kind(), out, version); + request.kind().serializer().serialize(request, out, version); + } + + @Override + public Request deserialize(DataInputPlus in, int version) throws IOException + { + Kind kind = Kind.serializer.deserialize(in, version); + return kind.serializer().deserialize(in, version); + } + + @Override + public long serializedSize(Request request, int version) + { + long size = Kind.serializer.serializedSize(request.kind(), version); + size += request.kind().serializer().serializedSize(request, version); + return size; + } + }; + } + + public static class MutationRequest implements Request + { + private final Mutation mutation; + private final Set<NodeId> recipients; + + private static Set<NodeId> nodeIds(ReplicaPlan.ForWrite plan) + { + ClusterMetadata cm = ClusterMetadata.current(); + Set<NodeId> recipients = new HashSet<>(plan.liveAndDown().size()); + for (Replica replica : plan.liveAndDown()) + recipients.add(cm.directory.peerId(replica.endpoint())); + return recipients; + } + + MutationRequest(Mutation mutation, ReplicaPlan.ForWrite plan) + { + this(mutation, nodeIds(plan)); + } + + public MutationRequest(Mutation mutation, Set<NodeId> recipients) + { + Preconditions.checkArgument(mutation.id().isNone()); + this.mutation = mutation; + this.recipients = recipients; + } + + @Override + public Kind kind() + { + return Kind.MUTATION; + } + + @Override + public DecoratedKey key() + { + return mutation.key(); + } + + @Override + public void applyLocallyAndForwardToReplicas(CoordinatorAckInfo ackTo) + { + Preconditions.checkState(ackTo != null); + Preconditions.checkArgument(mutation.id().isNone()); + String keyspaceName = mutation.getKeyspaceName(); + Token token = mutation.key().getToken(); + + MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token); + // Do not wait for handler completion, since the coordinator is already waiting and we don't want to block the stage + LeaderCallback handler = new LeaderCallback(keyspaceName, mutation.key().getToken(), id, ackTo); + applyLocallyAndForwardToReplicas(mutation.withMutationId(id), recipients, handler, ackTo); + } + + // TODO: refactor common with applyLocallyAndSendToReplicas + private static void applyLocallyAndForwardToReplicas(Mutation mutation, Set<NodeId> recipients, LeaderCallback handler, CoordinatorAckInfo ackTo) + { + Preconditions.checkState(ackTo != null); + ClusterMetadata cm = ClusterMetadata.current(); + String localDataCenter = cm.locator.local().datacenter; + + boolean applyLocally = false; + + // this DC replicas + List<Replica> localDCReplicas = null; + + // extra-DC, grouped by DC + Map<String, List<Replica>> remoteDCReplicas = null; + + // only need to create a Message for non-local writes + Message<Mutation> message = null; + + // Expensive, but easier to work with Replica than InetAddressAndPort for now + Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + EndpointsForToken endpoints = cm.placements.get(keyspace.getMetadata().params.replication).writes.forToken(mutation.key().getToken()).get(); + Map<NodeId, Replica> replicas = new HashMap<>(recipients.size()); + for (Replica replica : endpoints) + replicas.put(cm.directory.peerId(replica.endpoint()), replica); + + // For performance, Mutation caches serialized buffers that are computed lazily in serializedBuffer(). That + // computation is not synchronized however, and we will potentially call that method concurrently for each + // dispatched message (not that concurrent calls to serializedBuffer() are "unsafe" per se, just that they + // may result in multiple computations, making the caching optimization moot). So forcing the serialization + // here to make sure it's already cached/computed when it's concurrently used later. + // Side note: we have one cached buffers for each used EncodingVersion and this only pre-compute the one for + // the current version, but it's just an optimization, and we're ok not optimizing for mixed-version clusters. + Mutation.serializer.prepareSerializedBuffer(mutation, MessagingService.current_version); + + for (NodeId recipient : recipients) + { + if (cm.myNodeId().equals(recipient)) + { + applyLocally = true; + continue; + } + + if (message == null) + message = Message.builder(MUTATION_REQ, mutation) + .withRequestTime(handler.getRequestTime()) + .withFlag(MessageFlag.CALL_BACK_ON_FAILURE) + .withParam(ParamType.COORDINATOR_ACK_INFO, ackTo) + .withId(ackTo.id) + .build(); + + Replica replica = replicas.get(recipient); + String dc = cm.locator.location(replica.endpoint()).datacenter; + + if (localDataCenter.equals(dc)) + { + if (localDCReplicas == null) + localDCReplicas = new ArrayList<>(); + localDCReplicas.add(replica); + } + else + { + if (remoteDCReplicas == null) + remoteDCReplicas = new HashMap<>(); + + List<Replica> messages = remoteDCReplicas.get(dc); + if (messages == null) + messages = remoteDCReplicas.computeIfAbsent(dc, ignore -> new ArrayList<>(3)); // most DCs will have <= 3 replicas + messages.add(replica); + } + } + + Preconditions.checkState(applyLocally); // the leader is always a replica + TrackedWriteRequest.applyMutationLocally(mutation, handler); + + if (localDCReplicas != null) + for (Replica replica : localDCReplicas) + MessagingService.instance().sendWithCallback(message, replica.endpoint(), handler); + + if (remoteDCReplicas != null) + { + // for each datacenter, send the message to one node to relay the write to other replicas + for (List<Replica> dcReplicas : remoteDCReplicas.values()) + TrackedWriteRequest.sendMessagesToRemoteDC(message, EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler, ackTo); + } + } + + public static final IVersionedSerializer<Request> serializer = new IVersionedSerializer<>() + { + @Override + public void serialize(Request r, DataOutputPlus out, int version) throws IOException + { + MutationRequest request = (MutationRequest) r; + Mutation.serializer.serialize(request.mutation, out, version); + out.writeInt(request.recipients.size()); + for (NodeId recipient : request.recipients) + NodeId.messagingSerializer.serialize(recipient, out, version); + } + + @Override + public Request deserialize(DataInputPlus in, int version) throws IOException + { + Mutation mutation = Mutation.serializer.deserialize(in, version); + int numRecipients = in.readInt(); + Set<NodeId> recipients = Sets.newHashSetWithExpectedSize(numRecipients); + for (int i = 0; i < numRecipients; i++) + recipients.add(NodeId.messagingSerializer.deserialize(in, version)); + return new MutationRequest(mutation, recipients); + } + + @Override + public long serializedSize(Request r, int version) + { + MutationRequest request = (MutationRequest) r; + long size = Mutation.serializer.serializedSize(request.mutation, version); + size += TypeSizes.INT_SIZE; + for (NodeId recipient : request.recipients) + size += NodeId.messagingSerializer.serializedSize(recipient, version); + return size; + } + }; + } + + public static AbstractWriteResponseHandler<Object> forwardMutation(Mutation mutation, ReplicaPlan.ForWrite plan, AbstractReplicationStrategy strategy, Dispatcher.RequestTime requestTime) + { + // find leader + NodeProximity proximity = DatabaseDescriptor.getNodeProximity(); + ClusterMetadata cm = ClusterMetadata.current(); + Token token = mutation.key().getToken(); + Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + EndpointsForRange endpoints = cm.placements.get(keyspace.getMetadata().params.replication).writes.forRange(token).get(); + if (logger.isTraceEnabled()) + logger.trace("Finding best leader from replicas {}", endpoints); + + // TODO: Should match ReplicaPlans.findCounterLeaderReplica, including DC-local priority, current health, severity, etc. + Replica leader = proximity.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), endpoints).get(0); + + // create callback and forward to leader + if (logger.isTraceEnabled()) + logger.trace("Selected {} as leader for mutation with key {}", leader.endpoint(), mutation.key()); + + AbstractWriteResponseHandler<Object> handler = strategy.getWriteResponseHandler(plan, null, WriteType.SIMPLE, null, requestTime); + + // Add callbacks for replicas to respond directly to coordinator + Message<Request> toLeader = Message.out(Verb.FORWARDING_WRITE, new MutationRequest(mutation, plan)); + for (Replica endpoint : endpoints) + { + if (logger.isTraceEnabled()) + logger.trace("Adding forwarding callback for response from {} id {}", endpoint, toLeader.id()); + MessagingService.instance().callbacks.addWithExpiration(handler, toLeader, endpoint); + } + + MessagingService.instance().send(toLeader, leader.endpoint()); + + return handler; + } + + public static final IVerbHandler<Request> verbHandler = new IVerbHandler<>() + { + @Override + public void doVerb(Message<Request> incoming) + { + if (logger.isTraceEnabled()) + logger.trace("Received incoming ForwardedWriteRequest {} id {}", incoming, incoming.id()); + CoordinatorAckInfo ackTo = CoordinatorAckInfo.toCoordinator(incoming.from(), incoming.id()); + Request request = incoming.payload; + + // Once we support epoch changes, check epoch from coordinator here, after potential queueing on the Stage + try + { + request.applyLocallyAndForwardToReplicas(ackTo); + } + catch (Exception e) + { + logger.error("Exception while executing forwarded write with key {} on leader", request.key(), e); + MessagingService.instance().respondWithFailure(RequestFailureReason.UNKNOWN, incoming); + } + } + }; + + // Leader just needs to acknowledge propagation for its own log, not for client consistency level + // See org.apache.cassandra.service.TrackedWriteResponseHandler.onResponse, this class should probably merge with that one + public static class LeaderCallback implements RequestCallback<NoPayload> + { + private final String keyspace; + private final Token token; + private final MutationId id; + private final CoordinatorAckInfo ackTo; + private final Dispatcher.RequestTime requestTime = Dispatcher.RequestTime.forImmediateExecution(); + + public LeaderCallback(String keyspace, Token token, MutationId id, CoordinatorAckInfo ackTo) + { + this.keyspace = keyspace; + this.token = token; + this.id = id; + this.ackTo = ackTo; + } + + @Override + public void onResponse(Message<NoPayload> msg) + { + // Local mutations are witnessed from Keyspace.applyInternalTracked + if (msg != null) + MutationTrackingService.instance.witnessedRemoteMutation(keyspace, token, id, msg.from()); + + // Local write needs to be ack'd to coordinator + if (msg == null && ackTo != null) + { + Message<NoPayload> message = Message.builder(Verb.MUTATION_RSP, NoPayload.noPayload) + .from(FBUtilities.getBroadcastAddressAndPort()) + .withId(ackTo.id) + .build(); + MessagingService.instance().send(message, ackTo.coordinator); + } + } + + @Override + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) + { + logger.error("Got failure from {} reason {}", from, failureReason); + } + + @Override + public boolean invokeOnFailure() + { + return true; + } + + public Dispatcher.RequestTime getRequestTime() + { + return requestTime; + } + } + + public static class CoordinatorAckInfo + { + public static IVersionedSerializer<CoordinatorAckInfo> serializer = new IVersionedSerializer<>() + { + @Override + public void serialize(CoordinatorAckInfo ackTo, DataOutputPlus out, int version) throws IOException + { + InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(ackTo.coordinator, out, version); + out.writeLong(ackTo.id); + } + + @Override + public CoordinatorAckInfo deserialize(DataInputPlus in, int version) throws IOException + { + InetAddressAndPort coordinator = InetAddressAndPort.Serializer.inetAddressAndPortSerializer.deserialize(in, version); + long id = in.readLong(); + return new CoordinatorAckInfo(coordinator, id); + } + + @Override + public long serializedSize(CoordinatorAckInfo ackTo, int version) + { + long size = 0; + size += InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(ackTo.coordinator, version); + size += TypeSizes.LONG_SIZE; + return size; + } + }; + + public final InetAddressAndPort coordinator; + public final long id; + + private CoordinatorAckInfo(InetAddressAndPort coordinator, long id) + { + this.coordinator = coordinator; + this.id = id; + } + + private static CoordinatorAckInfo toCoordinator(InetAddressAndPort coordinator, long messageId) + { + return new CoordinatorAckInfo(coordinator, messageId); + } + } +} diff --git a/src/java/org/apache/cassandra/replication/LocalMutationStates.java b/src/java/org/apache/cassandra/replication/LocalMutationStates.java index 9bb1b4d834..ce1f13dd98 100644 --- a/src/java/org/apache/cassandra/replication/LocalMutationStates.java +++ b/src/java/org/apache/cassandra/replication/LocalMutationStates.java @@ -124,6 +124,7 @@ class LocalMutationStates void finishWriting(Mutation mutation) { + Preconditions.checkArgument(!mutation.id().isNone()); Entry entry = statesMap.get(mutation.id().offset()); Preconditions.checkNotNull(entry); entry.visibility = Visibility.VISIBLE; diff --git a/src/java/org/apache/cassandra/replication/MutationId.java b/src/java/org/apache/cassandra/replication/MutationId.java index ea33708833..5faf4db7d5 100644 --- a/src/java/org/apache/cassandra/replication/MutationId.java +++ b/src/java/org/apache/cassandra/replication/MutationId.java @@ -33,7 +33,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; */ public class MutationId extends ShortMutationId { - private static final long NONE_LOG_ID = Long.MIN_VALUE; + private static final long NONE_LOG_ID = CoordinatorLogId.none().asLong(); private static final long NONE_SEQUENCE_ID = Long.MIN_VALUE; private static final int NONE_OFFSET = offset(NONE_SEQUENCE_ID); private static final int NONE_TIMESTAMP = timestamp(NONE_SEQUENCE_ID); diff --git a/src/java/org/apache/cassandra/replication/MutationSummary.java b/src/java/org/apache/cassandra/replication/MutationSummary.java index 3483f710ec..53a18d1046 100644 --- a/src/java/org/apache/cassandra/replication/MutationSummary.java +++ b/src/java/org/apache/cassandra/replication/MutationSummary.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.*; import com.google.common.base.Preconditions; + import org.agrona.collections.Long2ObjectHashMap; import org.apache.cassandra.db.Digest; import org.apache.cassandra.db.TypeSizes; diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 0c48c3e498..3e48840531 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.IntSupplier; +import com.google.common.base.Preconditions; + import org.agrona.collections.IntArrayList; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Mutation; @@ -92,16 +94,19 @@ public class MutationTrackingService public void witnessedRemoteMutation(String keyspace, Token token, MutationId mutationId, InetAddressAndPort onHost) { + Preconditions.checkArgument(!mutationId.isNone()); getOrCreate(keyspace).witnessedRemoteMutation(token, mutationId, onHost); } public void startWriting(Mutation mutation) { + Preconditions.checkArgument(!mutation.id().isNone()); getOrCreate(mutation.getKeyspaceName()).startWriting(mutation); } public void finishWriting(Mutation mutation) { + Preconditions.checkArgument(!mutation.id().isNone()); getOrCreate(mutation.getKeyspaceName()).finishWriting(mutation); } @@ -153,6 +158,7 @@ public class MutationTrackingService static KeyspaceShards make(KeyspaceMetadata keyspace, ClusterMetadata cluster, IntSupplier logIdProvider) { + Preconditions.checkArgument(keyspace.params.replicationType.isTracked()); Map<Range<Token>, Shard> shards = new HashMap<>(); cluster.placements.get(keyspace.params.replication).writes.forEach((tokenRange, forRange) -> { IntArrayList participants = new IntArrayList(forRange.size(), IntArrayList.DEFAULT_NULL_VALUE); diff --git a/src/java/org/apache/cassandra/replication/Shard.java b/src/java/org/apache/cassandra/replication/Shard.java index 538e9a57c0..72ff1119ca 100644 --- a/src/java/org/apache/cassandra/replication/Shard.java +++ b/src/java/org/apache/cassandra/replication/Shard.java @@ -52,7 +52,9 @@ public class Shard this.sinceEpoch = sinceEpoch; this.logs = new NonBlockingHashMapLong<>(); this.currentLocalLog = startNewLog(localHostId, logIdProvider.getAsInt(), participants); - logs.put(currentLocalLog.logId.asLong(), currentLocalLog); + CoordinatorLogId logId = currentLocalLog.logId; + Preconditions.checkArgument(!logId.isNone()); + logs.put(logId.asLong(), currentLocalLog); } MutationId nextId() diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java index 5ff3acb34e..7ae7b35e77 100644 --- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java +++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java @@ -49,14 +49,17 @@ import org.apache.cassandra.net.ForwardingInfo; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageFlag; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.net.ParamType; +import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.net.Verb; +import org.apache.cassandra.service.AbstractWriteResponseHandler; import org.apache.cassandra.service.TrackedWriteResponseHandler; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MonotonicClock; -import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetrics; import static org.apache.cassandra.net.Verb.MUTATION_REQ; @@ -72,34 +75,37 @@ public class TrackedWriteRequest * @param consistencyLevel the consistency level for the write operation * @param requestTime object holding times when request got enqueued and started execution */ - public static TrackedWriteResponseHandler perform( + public static AbstractWriteResponseHandler<?> perform( Mutation mutation, ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime) { Tracing.trace("Determining replicas for mutation"); + Preconditions.checkArgument(mutation.id().isNone()); String keyspaceName = mutation.getKeyspaceName(); Keyspace keyspace = Keyspace.open(keyspaceName); Token token = mutation.key().getToken(); - MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token); - mutation = mutation.withMutationId(id); - ReplicaPlan.ForWrite plan = ReplicaPlans.forWrite(keyspace, consistencyLevel, token, ReplicaPlans.writeNormal); + AbstractReplicationStrategy rs = plan.replicationStrategy(); - if (plan.lookup(FBUtilities.getBroadcastAddressAndPort()) != null) - writeMetrics.localRequests.mark(); - else + if (plan.lookup(FBUtilities.getBroadcastAddressAndPort()) == null) + { + if (logger.isTraceEnabled()) + logger.trace("Remote tracked request {} {}", mutation, plan); writeMetrics.remoteRequests.mark(); + return ForwardedWrite.forwardMutation(mutation, plan, rs, requestTime); + } - AbstractReplicationStrategy rs = plan.replicationStrategy(); - - TrackedWriteResponseHandler handler = - TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null, WriteType.SIMPLE, null, requestTime), - keyspaceName, - mutation.key().getToken(), - id); + if (logger.isTraceEnabled()) + logger.trace("Local tracked request {} {}", mutation, plan); + writeMetrics.localRequests.mark(); + MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token); + mutation = mutation.withMutationId(id); + TrackedWriteResponseHandler handler = TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null, WriteType.SIMPLE, null, requestTime), + keyspaceName, + mutation.key().getToken(), + id); applyLocallyAndSendToReplicas(mutation, plan, handler); - return handler; } @@ -142,7 +148,12 @@ public class TrackedWriteRequest } if (message == null) - message = Message.outWithFlags(MUTATION_REQ, mutation, handler.getRequestTime(), singletonList(MessageFlag.CALL_BACK_ON_FAILURE)); + { + Message.Builder<Mutation> builder = Message.builder(MUTATION_REQ, mutation) + .withRequestTime(handler.getRequestTime()) + .withFlag(MessageFlag.CALL_BACK_ON_FAILURE); + message = builder.build(); + } String dc = DatabaseDescriptor.getLocator().location(destination.endpoint()).datacenter; @@ -175,31 +186,42 @@ public class TrackedWriteRequest { // for each datacenter, send the message to one node to relay the write to other replicas for (List<Replica> dcReplicas : remoteDCReplicas.values()) - sendMessagesToRemoteDC(message, EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler); + sendMessagesToRemoteDC(message, EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler, null); } } - private static void applyMutationLocally(Mutation mutation, TrackedWriteResponseHandler handler) + static void applyMutationLocally(Mutation mutation, RequestCallback<NoPayload> handler) { + Preconditions.checkArgument(handler instanceof TrackedWriteResponseHandler || handler instanceof ForwardedWrite.LeaderCallback); Stage.MUTATION.maybeExecuteImmediately(new LocalMutationRunnable(mutation, handler)); } private static class LocalMutationRunnable implements DebuggableTask.RunnableDebuggableTask { private final Mutation mutation; - private final TrackedWriteResponseHandler handler; + private final RequestCallback<NoPayload> handler; - LocalMutationRunnable(Mutation mutation, TrackedWriteResponseHandler handler) + LocalMutationRunnable(Mutation mutation, RequestCallback<NoPayload> handler) { + Preconditions.checkArgument(handler instanceof TrackedWriteResponseHandler || handler instanceof ForwardedWrite.LeaderCallback); this.mutation = mutation; this.handler = handler; } + private Dispatcher.RequestTime getRequestTime() + { + if (handler instanceof TrackedWriteResponseHandler) + return ((TrackedWriteResponseHandler) handler).getRequestTime(); + if (handler instanceof ForwardedWrite.LeaderCallback) + return ((ForwardedWrite.LeaderCallback) handler).getRequestTime(); + throw new IllegalStateException(); + } + @Override public final void run() { long now = MonotonicClock.Global.approxTime.now(); - long deadline = handler.getRequestTime().computeDeadline(MUTATION_REQ.expiresAfterNanos()); + long deadline = getRequestTime().computeDeadline(MUTATION_REQ.expiresAfterNanos()); if (now > deadline) { @@ -224,13 +246,13 @@ public class TrackedWriteRequest @Override public long creationTimeNanos() { - return handler.getRequestTime().enqueuedAtNanos(); + return getRequestTime().enqueuedAtNanos(); } @Override public long startTimeNanos() { - return handler.getRequestTime().startedAtNanos(); + return getRequestTime().startedAtNanos(); } @Override @@ -245,9 +267,10 @@ public class TrackedWriteRequest /* * Send the message to the first replica of targets, and have it forward the message to others in its DC */ - private static void sendMessagesToRemoteDC(Message<? extends IMutation> message, - EndpointsForToken targets, - TrackedWriteResponseHandler handler) + static void sendMessagesToRemoteDC(Message<? extends IMutation> message, + EndpointsForToken targets, + RequestCallback<NoPayload> handler, + ForwardedWrite.CoordinatorAckInfo ackTo) { final Replica target; @@ -258,7 +281,7 @@ public class TrackedWriteRequest for (Replica replica : forwardToReplicas) { - MessagingService.instance().callbacks.addWithExpiration(handler, message, replica); + MessagingService.instance().callbacks.addWithExpiration(handler, message, replica.endpoint()); logger.trace("Adding FWD message to {}@{}", message.id(), replica); } @@ -272,9 +295,14 @@ public class TrackedWriteRequest { target = targets.get(0); } + if (ackTo != null) + message = message.withParam(ParamType.COORDINATOR_ACK_INFO, ackTo); Tracing.trace("Sending mutation to remote replica {}", target); - MessagingService.instance().sendWriteWithCallback(message, target, handler); + if (handler instanceof ForwardedWrite.LeaderCallback) + MessagingService.instance().sendForwardedWriteWithCallback(message, target, (ForwardedWrite.LeaderCallback) handler); + else + MessagingService.instance().sendWriteWithCallback(message, target, (AbstractWriteResponseHandler<?>) handler); logger.trace("Sending message to {}@{}", message.id(), target); } diff --git a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java index b224cbf1c0..56528f89c8 100644 --- a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java @@ -54,10 +54,9 @@ public class TrackedWriteResponseHandler extends AbstractWriteResponseHandler<No @Override public void onResponse(Message<NoPayload> msg) { - /* local mutations are witnessed from Keyspace.applyInternalTracked */ + // Local mutations are witnessed from Keyspace.applyInternalTracked if (msg != null) MutationTrackingService.instance.witnessedRemoteMutation(keyspace, token, mutationId, msg.from()); - wrapped.onResponse(msg); } diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeId.java b/src/java/org/apache/cassandra/tcm/membership/NodeId.java index f011314815..e9dead773d 100644 --- a/src/java/org/apache/cassandra/tcm/membership/NodeId.java +++ b/src/java/org/apache/cassandra/tcm/membership/NodeId.java @@ -25,6 +25,7 @@ import java.util.UUID; import com.google.common.primitives.Ints; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.tcm.MultiStepOperation; @@ -123,4 +124,25 @@ public class NodeId implements Comparable<NodeId>, MultiStepOperation.SequenceKe return TypeSizes.sizeofUnsignedVInt(t.id); } } + + public static final IVersionedSerializer<NodeId> messagingSerializer = new IVersionedSerializer<NodeId>() + { + @Override + public void serialize(NodeId n, DataOutputPlus out, int version) throws IOException + { + out.writeUnsignedVInt32(n.id); + } + + @Override + public NodeId deserialize(DataInputPlus in, int version) throws IOException + { + return new NodeId(in.readUnsignedVInt32()); + } + + @Override + public long serializedSize(NodeId n, int version) + { + return TypeSizes.sizeofUnsignedVInt(n.id); + } + }; } diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java new file mode 100644 index 0000000000..108451ff81 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tracking; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.replication.MutationSummary; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.assertj.core.api.Assertions; + +import static java.lang.String.format; +import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; +import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology; + +public class MutationTrackingWriteForwardingTest extends TestBaseImpl +{ + private static final int NODES = 3; + private static final int RF = 1; + + private static int inst(int i) + { + return (i % NODES) + 1; + } + + @Test + public void testBasicWriteForwarding() throws Throwable + { + // 2 DCs, 1 replica in each, to test forwarding to instances in remote DCs and local DCs + Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(3, (nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2", "rack2")); + + // TODO: disable background reconciliation so we can test that writes are reconciling immediately + try (Cluster cluster = Cluster.build(NODES) + .withConfig(cfg -> cfg.with(Feature.NETWORK) + .with(Feature.GOSSIP) + .set("mutation_tracking_enabled", "true") + .set("write_request_timeout", "1000ms")) + .withNodeIdTopology(topology) + .start()) + { + String keyspaceName = "basic_write_forwarding_test"; + String tableName = "tbl"; + cluster.schemaChange(format("CREATE KEYSPACE %s WITH replication = " + + "{'class': 'NetworkTopologyStrategy', 'replication_factor': " + RF + "} " + + "AND replication_type='tracked';", keyspaceName)); + cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v int, primary key (k, c));", keyspaceName, tableName)); + + Map<IInstance, Integer> instanceUnreconciled = new HashMap<>(); + int ROWS = 100; + for (int inserted = 0; inserted < ROWS; inserted++) + { + // Writes should be completed for the client, regardless of whether they are forwarded or not + cluster.coordinator(inst(inserted)).execute(format("INSERT INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName), ConsistencyLevel.ALL, inserted, inserted, inserted); + + // Writes should be ack'd in the journal too, but these could lag behind client acks, so could be + // permissive here. Each write should be reconciled on the leader, unreconciled on the replica (until + // background reconciliation broadcast is implemented), and ignored on others. + IInstance replica = null; + for (IInvokableInstance instance : cluster) + { + int unreconciled = instance.callOnInstance(() -> { + Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); + Range<Token> fullRange = new Range<>(token, token); + TableId tableId = Schema.instance.getTableMetadata(keyspaceName, tableName).id; + MutationSummary summary = MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, true); + return summary.unreconciledIds(); + }); + int lastUnreconciled = instanceUnreconciled.getOrDefault(instance, 0); + int newUnreconciled = unreconciled - lastUnreconciled; + if (newUnreconciled == 1) + { + Assertions.assertThat(replica).isNull(); + replica = instance; + } + instanceUnreconciled.put(instance, unreconciled); + } + Assertions.assertThat(replica).isNotNull(); + } + Assertions.assertThat(instanceUnreconciled).matches(map -> { + int sum = 0; + for (Integer value : map.values()) + sum += value; + return sum == ROWS; + }); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org