This is an automated email from the ASF dual-hosted git repository.
paulo pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
new 7c2f97cd29 Do not submit hints when hinted_handoff_enabled=false
7c2f97cd29 is described below
commit 7c2f97cd29486196b50c65a093e92b0fcd9789d9
Author: Paulo Motta <[email protected]>
AuthorDate: Mon Mar 6 18:17:32 2023 -0500
Do not submit hints when hinted_handoff_enabled=false
- Remove dead WriteCallbackInfo code
Patch by Paulo Motta, Aleksey Yeschenko; Reviewed by Stefan Miklosovic,
Claude Warren for CASSANDRA-18304
Co-authored-by: Aleksey Yeschenko <[email protected]>
---
CHANGES.txt | 1 +
.../apache/cassandra/batchlog/BatchlogManager.java | 2 +-
.../org/apache/cassandra/net/MessagingService.java | 4 +-
.../org/apache/cassandra/net/RequestCallbacks.java | 69 +------------
.../service/AbstractWriteResponseHandler.java | 2 +-
.../org/apache/cassandra/service/StorageProxy.java | 10 +-
.../distributed/test/HintsDisabledTest.java | 74 ++++++++++++++
.../cassandra/net/WriteCallbackInfoTest.java | 107 ---------------------
8 files changed, 88 insertions(+), 181 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 7c7290f133..1cb56d160f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1.2
+ * Do not submit hints when hinted_handoff_enabled=false (CASSANDRA-18304)
* Fix COPY ... TO STDOUT behavior in cqlsh (CASSANDRA-18353)
* Remove six and Py2SaferScanner merge cruft (CASSANDRA-18354)
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 6ff5aee0cf..6d102b01c0 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -505,7 +505,7 @@ public class BatchlogManager implements BatchlogManagerMBean
ReplayWriteResponseHandler<Mutation> handler = new
ReplayWriteResponseHandler<>(replicaPlan, mutation, nanoTime());
Message<Mutation> message = Message.outWithFlag(MUTATION_REQ,
mutation, MessageFlag.CALL_BACK_ON_FAILURE);
for (Replica replica : liveRemoteOnly.all())
- MessagingService.instance().sendWriteWithCallback(message,
replica, handler, false);
+ MessagingService.instance().sendWriteWithCallback(message,
replica, handler);
return handler;
}
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java
b/src/java/org/apache/cassandra/net/MessagingService.java
index dab6962f5e..f14835661e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -357,10 +357,10 @@ public class MessagingService extends
MessagingServiceMBeanImpl
* @param handler callback interface which is used to pass the responses or
* suggest that a timeout occurred to the invoker of the
send().
*/
- public void sendWriteWithCallback(Message message, Replica to,
AbstractWriteResponseHandler<?> handler, boolean allowHints)
+ public void sendWriteWithCallback(Message message, Replica to,
AbstractWriteResponseHandler<?> handler)
{
assert message.callBackOnFailure();
- callbacks.addWithExpiration(handler, message, to,
handler.consistencyLevel(), allowHints);
+ callbacks.addWithExpiration(handler, message, to);
send(message, to.endpoint(), null);
}
diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java
b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index ae57078964..663126ff02 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -31,17 +31,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.InternodeOutboundMetrics;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.paxos.Commit;
-import org.apache.cassandra.utils.FBUtilities;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -99,22 +94,18 @@ public class RequestCallbacks implements
OutboundMessageCallbacks
/**
* Register the provided {@link RequestCallback}, inferring expiry and id
from the provided {@link Message}.
*/
- void addWithExpiration(RequestCallback cb, Message message,
InetAddressAndPort to)
+ public void addWithExpiration(RequestCallback<?> cb, Message<?> message,
InetAddressAndPort to)
{
- // mutations need to call the overload with a ConsistencyLevel
+ // mutations need to call the overload
assert message.verb() != Verb.MUTATION_REQ && message.verb() !=
Verb.COUNTER_MUTATION_REQ;
CallbackInfo previous = callbacks.put(key(message.id(), to), new
CallbackInfo(message, to, cb));
assert previous == null : format("Callback already exists for id
%d/%s! (%s)", message.id(), to, previous);
}
- public void addWithExpiration(AbstractWriteResponseHandler<?> cb,
- Message<?> message,
- Replica to,
- ConsistencyLevel consistencyLevel,
- boolean allowHints)
+ public void addWithExpiration(AbstractWriteResponseHandler<?> cb,
Message<?> message, Replica to)
{
assert message.verb() == Verb.MUTATION_REQ || message.verb() ==
Verb.COUNTER_MUTATION_REQ || message.verb() == Verb.PAXOS_COMMIT_REQ;
- CallbackInfo previous = callbacks.put(key(message.id(),
to.endpoint()), new WriteCallbackInfo(message, to, cb, consistencyLevel,
allowHints));
+ CallbackInfo previous = callbacks.put(key(message.id(),
to.endpoint()), new CallbackInfo(message, to.endpoint(), cb));
assert previous == null : format("Callback already exists for id
%d/%s! (%s)", message.id(), to.endpoint(), previous);
}
@@ -274,11 +265,6 @@ public class RequestCallbacks implements
OutboundMessageCallbacks
return atNano > expiresAtNanos;
}
- boolean shouldHint()
- {
- return false;
- }
-
boolean invokeOnFailure()
{
return callback.invokeOnFailure();
@@ -290,53 +276,6 @@ public class RequestCallbacks implements
OutboundMessageCallbacks
}
}
- // FIXME: shouldn't need a specialized container for write callbacks;
hinting should be part of
- // AbstractWriteResponseHandler implementation.
- static class WriteCallbackInfo extends CallbackInfo
- {
- // either a Mutation, or a Paxos Commit (MessageOut)
- private final Object mutation;
- private final Replica replica;
-
- @VisibleForTesting
- WriteCallbackInfo(Message message, Replica replica, RequestCallback<?>
callback, ConsistencyLevel consistencyLevel, boolean allowHints)
- {
- super(message, replica.endpoint(), callback);
- this.mutation = shouldHint(allowHints, message, consistencyLevel)
? message.payload : null;
- //Local writes shouldn't go through messaging service
(https://issues.apache.org/jira/browse/CASSANDRA-10477)
- //noinspection AssertWithSideEffects
- assert !peer.equals(FBUtilities.getBroadcastAddressAndPort());
- this.replica = replica;
- }
-
- public boolean shouldHint()
- {
- return mutation != null && StorageProxy.shouldHint(replica);
- }
-
- public Replica getReplica()
- {
- return replica;
- }
-
- public Mutation mutation()
- {
- return getMutation(mutation);
- }
-
- private static Mutation getMutation(Object object)
- {
- assert object instanceof Commit || object instanceof Mutation :
object;
- return object instanceof Commit ? ((Commit) object).makeMutation()
- : (Mutation) object;
- }
-
- private static boolean shouldHint(boolean allowHints, Message
sentMessage, ConsistencyLevel consistencyLevel)
- {
- return allowHints && sentMessage.verb() !=
Verb.COUNTER_MUTATION_REQ && consistencyLevel != ConsistencyLevel.ANY;
- }
- }
-
@Override
public void onOverloaded(Message<?> message, InetAddressAndPort peer)
{
diff --git
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 313f714820..ce282661a8 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -282,7 +282,7 @@ public abstract class AbstractWriteResponseHandler<T>
implements RequestCallback
if (blockFor() + n > candidateReplicaCount())
signal();
- if (hintOnFailure != null)
+ if (hintOnFailure != null &&
StorageProxy.shouldHint(replicaPlan.lookup(from)))
StorageProxy.submitHint(hintOnFailure.get(),
replicaPlan.lookup(from), null);
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8499d625a4..2a0ccab793 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -779,7 +779,7 @@ public class StorageProxy implements StorageProxyMBean
if (replica.isSelf())
commitPaxosLocal(replica, message, responseHandler);
else
-
MessagingService.instance().sendWriteWithCallback(message, replica,
responseHandler, allowHints && shouldHint(replica));
+
MessagingService.instance().sendWriteWithCallback(message, replica,
responseHandler);
}
else
{
@@ -1529,7 +1529,7 @@ public class StorageProxy implements StorageProxyMBean
if (localDc != null)
{
for (Replica destination : localDc)
- MessagingService.instance().sendWriteWithCallback(message,
destination, responseHandler, true);
+ MessagingService.instance().sendWriteWithCallback(message,
destination, responseHandler);
}
if (dcGroups != null)
{
@@ -1571,7 +1571,7 @@ public class StorageProxy implements StorageProxyMBean
for (Replica replica : forwardToReplicas)
{
-
MessagingService.instance().callbacks.addWithExpiration(handler, message,
replica, handler.replicaPlan.consistencyLevel(), true);
+
MessagingService.instance().callbacks.addWithExpiration(handler, message,
replica);
logger.trace("Adding FWD message to {}@{}", message.id(),
replica);
}
@@ -1586,7 +1586,7 @@ public class StorageProxy implements StorageProxyMBean
target = targets.get(0);
}
- MessagingService.instance().sendWriteWithCallback(message, target,
handler, true);
+ MessagingService.instance().sendWriteWithCallback(message, target,
handler);
logger.trace("Sending message to {}@{}", message.id(), target);
}
@@ -1684,7 +1684,7 @@ public class StorageProxy implements StorageProxyMBean
Tracing.trace("Enqueuing counter update to {}", replica);
Message message = Message.outWithFlag(Verb.COUNTER_MUTATION_REQ,
cm, MessageFlag.CALL_BACK_ON_FAILURE);
- MessagingService.instance().sendWriteWithCallback(message,
replica, responseHandler, false);
+ MessagingService.instance().sendWriteWithCallback(message,
replica, responseHandler);
return responseHandler;
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/HintsDisabledTest.java
b/test/distributed/org/apache/cassandra/distributed/test/HintsDisabledTest.java
new file mode 100644
index 0000000000..64d023fa50
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/HintsDisabledTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import
com.google.monitoring.runtime.instrumentation.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.metrics.StorageMetrics;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.net.Verb.MUTATION_REQ;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class HintsDisabledTest extends TestBaseImpl
+{
+ @Test
+ public void testHintedHandoffDisabled() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(2)
+ .withDataDirCount(1)
+ .withConfig(config ->
config.with(NETWORK, GOSSIP)
+
.set("write_request_timeout", "10ms")
+
.set("hinted_handoff_enabled", false))
+ .start(), 2))
+ {
+ String createTableStatement = String.format("CREATE TABLE %s.cf (k
text PRIMARY KEY, c1 text) " +
+ "WITH compaction =
{'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'} ", KEYSPACE);
+ cluster.schemaChange(createTableStatement);
+
+ // Drop all messages from node1 to node2 so hints should be created
+ IMessageFilters.Filter drop1to2 =
cluster.filters().verbs(MUTATION_REQ.id).from(1).to(2).drop();
+
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k,
c1) VALUES (?, ?) USING TIMESTAMP 1;"),
+ ConsistencyLevel.ONE,
+ String.valueOf(1),
+ String.valueOf(1));
+
+ // Wait 15ms for write to timeout (write_request_timeout=10ms)
+ Uninterruptibles.sleepUninterruptibly(15, TimeUnit.MILLISECONDS);
+
+ // Check that no hints were created on node1
+ assertThat(cluster.get(1).callOnInstance(() ->
Long.valueOf(StorageMetrics.totalHints.getCount()))).isEqualTo(0L);
+ }
+ }
+
+ private static int getNumberOfSSTables(Cluster cluster, int node)
+ {
+ return cluster.get(node).callOnInstance(() ->
ColumnFamilyStore.getIfExists(KEYSPACE, "cf").getLiveSSTables().size());
+ }
+}
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
deleted file mode 100644
index 2bc24dc840..0000000000
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.net;
-
-import java.util.UUID;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.junit.Assert;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.RegularAndStaticColumns;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.schema.MockSchema;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.paxos.Commit;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import static org.apache.cassandra.locator.ReplicaUtils.full;
-import static org.apache.cassandra.service.paxos.Ballot.Flag.NONE;
-import static
org.apache.cassandra.service.paxos.BallotGenerator.Global.nextBallot;
-
-public class WriteCallbackInfoTest
-{
- private InetAddressAndPort testEp;
-
- @BeforeClass
- public static void initDD()
- {
- DatabaseDescriptor.daemonInitialization();
- }
-
- @Before
- public void setup() throws Exception
- {
- testEp = InetAddressAndPort.getByName("192.168.1.1");
-
StorageService.instance.getTokenMetadata().updateHostId(UUID.randomUUID(),
testEp);
- }
-
- @After
- public void teardown()
- {
- StorageService.instance.getTokenMetadata().removeEndpoint(testEp);
- }
-
- @Test
- public void testShouldHint() throws Exception
- {
- testShouldHint(Verb.COUNTER_MUTATION_REQ, ConsistencyLevel.ALL, true,
false);
- for (Verb verb : new Verb[] { Verb.PAXOS_COMMIT_REQ, Verb.MUTATION_REQ
})
- {
- testShouldHint(verb, ConsistencyLevel.ALL, true, true);
- testShouldHint(verb, ConsistencyLevel.ANY, true, false);
- testShouldHint(verb, ConsistencyLevel.ALL, false, false);
- }
- }
-
- private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean
allowHints, boolean expectHint)
- {
- TableMetadata metadata = MockSchema.newTableMetadata("", "");
- Object payload = verb == Verb.PAXOS_COMMIT_REQ
- ? new Commit(nextBallot(NONE), new
PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER,
RegularAndStaticColumns.NONE, 1).build())
- : new
Mutation(PartitionUpdate.simpleBuilder(metadata, "").build());
-
- RequestCallbacks.WriteCallbackInfo wcbi = new
RequestCallbacks.WriteCallbackInfo(Message.out(verb, payload), full(testEp),
null, cl, allowHints);
- Assert.assertEquals(expectHint, wcbi.shouldHint());
- if (expectHint)
- {
- Assert.assertNotNull(wcbi.mutation());
- }
- else
- {
- boolean fail = false;
- try
- {
- wcbi.mutation();
- }
- catch (Throwable t)
- {
- fail = true;
- }
- Assert.assertTrue(fail);
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]