This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new 5beab63b55 Improve the way we handle repair message timeouts to avoid
hanging repairs
5beab63b55 is described below
commit 5beab63b5550efb5e31e5005f90649661a9fe595
Author: Marcus Eriksson <[email protected]>
AuthorDate: Mon Aug 29 13:27:16 2022 +0200
Improve the way we handle repair message timeouts to avoid hanging repairs
Patch by marcuse; reviewed by David Capwell for CASSANDRA-17613
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 10 +++
src/java/org/apache/cassandra/net/Verb.java | 33 +++----
.../cassandra/repair/AsymmetricRemoteSyncTask.java | 7 +-
.../cassandra/repair/RepairMessageVerbHandler.java | 4 +
.../cassandra/repair/StreamingRepairTask.java | 9 +-
.../cassandra/repair/SymmetricRemoteSyncTask.java | 11 ---
src/java/org/apache/cassandra/repair/SyncTask.java | 12 +++
.../apache/cassandra/repair/ValidationTask.java | 9 +-
.../cassandra/repair/messages/RepairMessage.java | 63 +++++++++++++
.../apache/cassandra/service/StorageService.java | 13 +++
.../cassandra/service/StorageServiceMBean.java | 3 +
.../distributed/test/RepairRequestTimeoutTest.java | 100 +++++++++++++++++++++
14 files changed, 240 insertions(+), 37 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index e1f8d19c30..96a37f53e9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.7
+ * Avoid getting hanging repairs due to repair message timeouts
(CASSANDRA-17613)
* Prevent infinite loop in repair coordinator on FailSession (CASSANDRA-17834)
Merged from 3.11:
* Fix potential IndexOutOfBoundsException in PagingState in mixed mode
clusters (CASSANDRA-17840)
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index e8aa297e77..f8d8d46db8 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -134,6 +134,8 @@ public class Config
public volatile Integer repair_session_max_tree_depth = null;
public volatile Integer repair_session_space_in_mb = null;
+ public volatile long repair_request_timeout_in_ms =
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
+
public volatile boolean use_offheap_merkle_trees = true;
public int storage_port = 7000;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 82a5ec73ba..f78a5b668a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1723,6 +1723,16 @@ public class DatabaseDescriptor
return unit.convert(getBlockForPeersTimeoutInSeconds(),
TimeUnit.SECONDS);
}
+ public static long getRepairRpcTimeout(TimeUnit unit)
+ {
+ return unit.convert(conf.repair_request_timeout_in_ms, MILLISECONDS);
+ }
+
+ public static void setRepairRpcTimeout(long time, TimeUnit unit)
+ {
+ conf.repair_request_timeout_in_ms = MILLISECONDS.convert(time, unit);
+ }
+
public static double getPhiConvictThreshold()
{
return conf.phi_convict_threshold;
diff --git a/src/java/org/apache/cassandra/net/Verb.java
b/src/java/org/apache/cassandra/net/Verb.java
index fad2fbf6a9..9d8b76dd35 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -147,22 +147,22 @@ public enum Verb
SCHEMA_VERSION_REQ (20, P1, rpcTimeout, MIGRATION, () ->
NoPayload.serializer, () -> SchemaVersionVerbHandler.instance,
SCHEMA_VERSION_RSP ),
// repair; mostly doesn't use callbacks and sends responses as their own
request messages, with matching sessions by uuid; should eventually harmonize
and make idiomatic
- REPAIR_RSP (100, P1, rpcTimeout, REQUEST_RESPONSE, () ->
NoPayload.serializer, () -> ResponseVerbHandler.instance
),
- VALIDATION_RSP (102, P1, rpcTimeout, ANTI_ENTROPY, () ->
ValidationResponse.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- VALIDATION_REQ (101, P1, rpcTimeout, ANTI_ENTROPY, () ->
ValidationRequest.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- SYNC_RSP (104, P1, rpcTimeout, ANTI_ENTROPY, () ->
SyncResponse.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- SYNC_REQ (103, P1, rpcTimeout, ANTI_ENTROPY, () ->
SyncRequest.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- PREPARE_MSG (105, P1, rpcTimeout, ANTI_ENTROPY, () ->
PrepareMessage.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- SNAPSHOT_MSG (106, P1, rpcTimeout, ANTI_ENTROPY, () ->
SnapshotMessage.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- CLEANUP_MSG (107, P1, rpcTimeout, ANTI_ENTROPY, () ->
CleanupMessage.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- PREPARE_CONSISTENT_RSP (109, P1, rpcTimeout, ANTI_ENTROPY, () ->
PrepareConsistentResponse.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- PREPARE_CONSISTENT_REQ (108, P1, rpcTimeout, ANTI_ENTROPY, () ->
PrepareConsistentRequest.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- FINALIZE_PROPOSE_MSG (110, P1, rpcTimeout, ANTI_ENTROPY, () ->
FinalizePropose.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- FINALIZE_PROMISE_MSG (111, P1, rpcTimeout, ANTI_ENTROPY, () ->
FinalizePromise.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- FINALIZE_COMMIT_MSG (112, P1, rpcTimeout, ANTI_ENTROPY, () ->
FinalizeCommit.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- FAILED_SESSION_MSG (113, P1, rpcTimeout, ANTI_ENTROPY, () ->
FailSession.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- STATUS_RSP (115, P1, rpcTimeout, ANTI_ENTROPY, () ->
StatusResponse.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
- STATUS_REQ (114, P1, rpcTimeout, ANTI_ENTROPY, () ->
StatusRequest.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ REPAIR_RSP (100, P1, repairMsgTimeout,REQUEST_RESPONSE, () ->
NoPayload.serializer, () -> ResponseVerbHandler.instance
),
+ VALIDATION_RSP (102, P1, longTimeout ,ANTI_ENTROPY, () ->
ValidationResponse.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ VALIDATION_REQ (101, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
ValidationRequest.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ SYNC_RSP (104, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
SyncResponse.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ SYNC_REQ (103, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
SyncRequest.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ PREPARE_MSG (105, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
PrepareMessage.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ SNAPSHOT_MSG (106, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
SnapshotMessage.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ CLEANUP_MSG (107, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
CleanupMessage.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ PREPARE_CONSISTENT_RSP (109, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
PrepareConsistentResponse.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ PREPARE_CONSISTENT_REQ (108, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
PrepareConsistentRequest.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ FINALIZE_PROPOSE_MSG (110, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
FinalizePropose.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ FINALIZE_PROMISE_MSG (111, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
FinalizePromise.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ FINALIZE_COMMIT_MSG (112, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
FinalizeCommit.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ FAILED_SESSION_MSG (113, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
FailSession.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ STATUS_RSP (115, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
StatusResponse.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
+ STATUS_REQ (114, P1, repairMsgTimeout,ANTI_ENTROPY, () ->
StatusRequest.serializer, () -> RepairMessageVerbHandler.instance,
REPAIR_RSP ),
REPLICATION_DONE_RSP (82, P0, rpcTimeout, MISC, () ->
NoPayload.serializer, () -> ResponseVerbHandler.instance
),
REPLICATION_DONE_REQ (22, P0, rpcTimeout, MISC, () ->
NoPayload.serializer, () ->
ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP),
@@ -450,4 +450,5 @@ class VerbTimeouts
static final ToLongFunction<TimeUnit> pingTimeout =
DatabaseDescriptor::getPingTimeout;
static final ToLongFunction<TimeUnit> longTimeout = units ->
Math.max(DatabaseDescriptor.getRpcTimeout(units), units.convert(5L,
TimeUnit.MINUTES));
static final ToLongFunction<TimeUnit> noTimeout = units -> { throw
new IllegalStateException(); };
+ static final ToLongFunction<TimeUnit> repairMsgTimeout=
DatabaseDescriptor::getRepairRpcTimeout;
}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index 40a1f514be..9762c9f08f 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -20,20 +20,17 @@ package org.apache.cassandra.repair;
import java.util.List;
+
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
-import static org.apache.cassandra.net.Verb.SYNC_REQ;
-
/**
* AsymmetricRemoteSyncTask sends {@link SyncRequest} to target node to
repair(stream)
* data with other target replica.
@@ -53,7 +50,7 @@ public class AsymmetricRemoteSyncTask extends SyncTask
implements CompletableRem
SyncRequest request = new SyncRequest(desc, local,
nodePair.coordinator, nodePair.peer, rangesToSync, previewKind, true);
String message = String.format("Forwarding streaming repair of %d
ranges to %s (to be streamed with %s)", request.ranges.size(), request.src,
request.dst);
Tracing.traceRepair(message);
- MessagingService.instance().send(Message.out(SYNC_REQ, request),
request.src);
+ sendRequest(request, request.src);
}
public void syncComplete(boolean success, List<SessionSummary> summaries)
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index bfc2657ab4..7488f2ebc1 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -124,6 +124,8 @@ public class RepairMessageVerbHandler implements
IVerbHandler<RepairMessage>
break;
case VALIDATION_REQ:
+ // notify initiator that the message has been received,
allowing this method to take as long as it needs to
+ MessagingService.instance().send(message.emptyResponse(),
message.from());
ValidationRequest validationRequest = (ValidationRequest)
message.payload;
logger.debug("Validating {}", validationRequest);
// trigger read-only compaction
@@ -143,6 +145,8 @@ public class RepairMessageVerbHandler implements
IVerbHandler<RepairMessage>
break;
case SYNC_REQ:
+ // notify initiator that the message has been received,
allowing this method to take as long as it needs to
+ MessagingService.instance().send(message.emptyResponse(),
message.from());
// forwarded sync request
SyncRequest request = (SyncRequest) message.payload;
logger.debug("Syncing {}", request);
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index fbfbac8748..9c6caf46f5 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -39,7 +39,11 @@ import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.streaming.StreamOperation;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.net.Verb.SYNC_RSP;
+import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+
/**
* StreamingRepairTask performs data streaming between two remote replicas,
neither of which is repair coordinator.
@@ -73,7 +77,10 @@ public class StreamingRepairTask implements Runnable,
StreamEventHandler
public void run()
{
logger.info("[streaming task #{}] Performing {}streaming repair of {}
ranges with {}", desc.sessionId, asymmetric ? "asymmetric " : "",
ranges.size(), dst);
- createStreamPlan(dst).execute();
+ long start = approxTime.now();
+ StreamPlan streamPlan = createStreamPlan(dst);
+ logger.info("[streaming task #{}] Stream plan created in {}ms",
desc.sessionId, MILLISECONDS.convert(approxTime.now() - start, NANOSECONDS));
+ streamPlan.execute();
}
@VisibleForTesting
diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
index b4e2d9c5a1..629f4bb8aa 100644
--- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
@@ -27,17 +27,12 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
-import static org.apache.cassandra.net.Verb.SYNC_REQ;
-
/**
* SymmetricRemoteSyncTask sends {@link SyncRequest} to
remote(non-coordinator) node
* to repair(stream) data with other replica.
@@ -53,16 +48,10 @@ public class SymmetricRemoteSyncTask extends SyncTask
implements CompletableRemo
super(desc, r1, r2, differences, previewKind);
}
- void sendRequest(SyncRequest request, InetAddressAndPort to)
- {
- MessagingService.instance().send(Message.out(SYNC_REQ, request), to);
- }
-
@Override
protected void startSync()
{
InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
-
SyncRequest request = new SyncRequest(desc, local,
nodePair.coordinator, nodePair.peer, rangesToSync, previewKind, false);
Preconditions.checkArgument(nodePair.coordinator.equals(request.src));
String message = String.format("Forwarding streaming repair of %d
ranges to %s (to be streamed with %s)", request.ranges.size(), request.src,
request.dst);
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java
b/src/java/org/apache/cassandra/repair/SyncTask.java
index fe9f09ed7e..24e206828d 100644
--- a/src/java/org/apache/cassandra/repair/SyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -32,9 +32,13 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.Tracing;
+import static org.apache.cassandra.net.Verb.SYNC_REQ;
+
public abstract class SyncTask extends AbstractFuture<SyncStat> implements
Runnable
{
private static final Logger logger =
LoggerFactory.getLogger(SyncTask.class);
@@ -101,4 +105,12 @@ public abstract class SyncTask extends
AbstractFuture<SyncStat> implements Runna
}
public void abort() {}
+
+ void sendRequest(SyncRequest request, InetAddressAndPort to)
+ {
+ RepairMessage.sendMessageWithFailureCB(request,
+ SYNC_REQ,
+ to,
+ this::setException);
+ }
}
diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java
b/src/java/org/apache/cassandra/repair/ValidationTask.java
index 0161acf8d8..b4aef249c5 100644
--- a/src/java/org/apache/cassandra/repair/ValidationTask.java
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@ -21,8 +21,7 @@ import com.google.common.util.concurrent.AbstractFuture;
import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationRequest;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.MerkleTrees;
@@ -53,8 +52,10 @@ public class ValidationTask extends
AbstractFuture<TreeResponse> implements Runn
*/
public void run()
{
- ValidationRequest request = new ValidationRequest(desc, nowInSec);
- MessagingService.instance().send(Message.out(VALIDATION_REQ, request),
endpoint);
+ RepairMessage.sendMessageWithFailureCB(new ValidationRequest(desc,
nowInSec),
+ VALIDATION_REQ,
+ endpoint,
+ this::setException);
}
/**
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index 3137b4e474..165911dc9b 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -17,7 +17,23 @@
*/
package org.apache.cassandra.repair.messages;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.utils.CassandraVersion;
+
+import static org.apache.cassandra.net.MessageFlag.CALL_BACK_ON_FAILURE;
/**
* Base class of all repair related request/response messages.
@@ -26,10 +42,57 @@ import org.apache.cassandra.repair.RepairJobDesc;
*/
public abstract class RepairMessage
{
+ private static final CassandraVersion SUPPORTS_TIMEOUTS = new
CassandraVersion("4.0.7-SNAPSHOT");
+ private static final Logger logger =
LoggerFactory.getLogger(RepairMessage.class);
public final RepairJobDesc desc;
protected RepairMessage(RepairJobDesc desc)
{
this.desc = desc;
}
+
+ public interface RepairFailureCallback
+ {
+ void onFailure(Exception e);
+ }
+
+ public static void sendMessageWithFailureCB(RepairMessage request, Verb
verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback)
+ {
+ RequestCallback<?> callback = new RequestCallback<Object>()
+ {
+ @Override
+ public void onResponse(Message<Object> msg)
+ {
+ logger.info("[#{}] {} received by {}",
request.desc.parentSessionId, verb, endpoint);
+ // todo: at some point we should make repair messages follow
the normal path, actually using this
+ }
+
+ @Override
+ public boolean invokeOnFailure()
+ {
+ return true;
+ }
+
+ public void onFailure(InetAddressAndPort from,
RequestFailureReason failureReason)
+ {
+ logger.error("[#{}] {} failed on {}: {}",
request.desc.parentSessionId, verb, from, failureReason);
+
+ if (supportsTimeouts(from, request.desc.parentSessionId))
+ failureCallback.onFailure(new
RepairException(request.desc, String.format("Got %s failure from %s: %s", verb,
from, failureReason)));
+ }
+ };
+
+ MessagingService.instance().sendWithCallback(Message.outWithFlag(verb,
request, CALL_BACK_ON_FAILURE),
+ endpoint,
+ callback);
+ }
+
+ private static boolean supportsTimeouts(InetAddressAndPort from, UUID
parentSessionId)
+ {
+ CassandraVersion remoteVersion =
Gossiper.instance.getReleaseVersion(from);
+ if (remoteVersion != null &&
remoteVersion.compareTo(SUPPORTS_TIMEOUTS) >= 0)
+ return true;
+ logger.warn("[#{}] Not failing repair due to remote host {} not
supporting repair message timeouts (version = {})", parentSessionId, from,
remoteVersion);
+ return false;
+ }
}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 4568c5af8f..4721113ad3 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -6043,4 +6043,17 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
logger.info("Changing keyspace count warn threshold from {} to {}",
getKeyspaceCountWarnThreshold(), value);
DatabaseDescriptor.setKeyspaceCountWarnThreshold(value);
}
+
+ public Long getRepairRpcTimeout()
+ {
+ return DatabaseDescriptor.getRepairRpcTimeout(MILLISECONDS);
+ }
+
+ public void setRepairRpcTimeout(Long timeoutInMillis)
+ {
+ Preconditions.checkState(timeoutInMillis > 0);
+ DatabaseDescriptor.setRepairRpcTimeout(timeoutInMillis, MILLISECONDS);
+ logger.info("RepairRpcTimeout set to {}ms via JMX", timeoutInMillis);
+ }
+
}
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 30e643bc1d..c61e45e7a4 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -862,4 +862,7 @@ public interface StorageServiceMBean extends
NotificationEmitter
void setTableCountWarnThreshold(int value);
int getKeyspaceCountWarnThreshold();
void setKeyspaceCountWarnThreshold(int value);
+
+ public Long getRepairRpcTimeout();
+ public void setRepairRpcTimeout(Long timeoutInMillis);
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/RepairRequestTimeoutTest.java
b/test/distributed/org/apache/cassandra/distributed/test/RepairRequestTimeoutTest.java
new file mode 100644
index 0000000000..33e0e78724
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/RepairRequestTimeoutTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.net.Verb;
+
+import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.net.Verb.SYNC_REQ;
+import static org.apache.cassandra.net.Verb.VALIDATION_REQ;
+import static org.junit.Assert.assertTrue;
+
+public class RepairRequestTimeoutTest extends TestBaseImpl
+{
+ static Cluster CLUSTER;
+ static final long timeoutMillis = 1000;
+ @BeforeClass
+ public static void setup() throws IOException
+ {
+ CLUSTER = init(Cluster.build(3)
+ .withConfig(config -> config.with(GOSSIP,
NETWORK).set("repair_request_timeout_in_ms", timeoutMillis))
+ .start());
+ CLUSTER.schemaChange(withKeyspace("create table %s.tbl (id int primary
key)"));
+ }
+
+ @Before
+ public void before()
+ {
+ CLUSTER.filters().reset();
+ }
+
+ @Test
+ public void testLostSYNC_REQ()
+ {
+ testLostMessageHelper(SYNC_REQ);
+ }
+
+ @Test
+ public void testLostVALIDATION_REQ()
+ {
+ testLostMessageHelper(VALIDATION_REQ);
+ }
+
+ public void testLostMessageHelper(Verb verb)
+ {
+ for (int i = 0; i < 10; i++)
+ CLUSTER.coordinator(1).execute(withKeyspace("insert into %s.tbl
(id) values (?)"), ConsistencyLevel.ALL, i);
+ for (int i = 10; i < 20; i++)
+ CLUSTER.get((i % 3) + 1).executeInternal(withKeyspace("insert into
%s.tbl (id) values (?)"), i);
+ CLUSTER.forEach(i -> i.flush(KEYSPACE));
+ CLUSTER.filters().verbs(verb.id).drop();
+ // symmetric vs asymmetric:
+ CLUSTER.get(1).nodetoolResult("repair", "-full", KEYSPACE,
"tbl").asserts().failure().notificationContains(verb + " failure from");
+ CLUSTER.get(1).nodetoolResult("repair", "-full", "-os", KEYSPACE,
"tbl").asserts().failure().notificationContains(verb + " failure from");
+
+ // and success
+ CLUSTER.filters().reset();
+ long mark = CLUSTER.get(1).logs().mark();
+
+ CLUSTER.get(1).nodetoolResult("repair", "-full", KEYSPACE,
"tbl").asserts().success();
+ for (int i = 10; i < 20; i++)
+ CLUSTER.get((i % 3) + 1).executeInternal(withKeyspace("insert into
%s.tbl (id) values (?)"), i);
+
+ CLUSTER.get(1).nodetoolResult("repair", "-full", "-os", KEYSPACE,
"tbl").asserts().success();
+ CLUSTER.get(1).runOnInstance(() -> {
+ // make sure we don't get any expirations after the repair has
finished
+ long expirationInterval =
DatabaseDescriptor.getMinRpcTimeout(MILLISECONDS) / 2; // see
RequestCallbacks.java
+ sleepUninterruptibly((timeoutMillis + expirationInterval) * 2,
MILLISECONDS);
+ });
+
+ assertTrue(CLUSTER.get(1).logs().grep(mark, "failure
from").getResult().isEmpty());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]