This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit cd576a0d140c94007fd0181044df453c0681252f Merge: 726b67b1e4 5beab63b55 Author: Marcus Eriksson <[email protected]> AuthorDate: Mon Aug 29 13:27:46 2022 +0200 Merge branch 'cassandra-4.0' into cassandra-4.1 CHANGES.txt | 4 + src/java/org/apache/cassandra/config/Config.java | 1 + src/java/org/apache/cassandra/net/Verb.java | 2 +- .../cassandra/repair/AsymmetricRemoteSyncTask.java | 7 +- .../cassandra/repair/RepairMessageVerbHandler.java | 5 +- .../cassandra/repair/StreamingRepairTask.java | 9 +- .../cassandra/repair/SymmetricRemoteSyncTask.java | 10 --- src/java/org/apache/cassandra/repair/SyncTask.java | 11 +++ .../apache/cassandra/repair/ValidationTask.java | 9 +- .../cassandra/repair/messages/RepairMessage.java | 63 +++++++++++++ .../cassandra/service/ActiveRepairService.java | 2 - .../distributed/test/RepairCoordinatorBase.java | 3 + .../distributed/test/RepairRequestTimeoutTest.java | 100 +++++++++++++++++++++ .../upgrade/RepairRequestTimeoutUpgradeTest.java | 58 ++++++++++++ 14 files changed, 260 insertions(+), 24 deletions(-) diff --cc CHANGES.txt index 4335f0ea2f,96a37f53e9..bf0ea108f8 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,47 -1,12 +1,51 @@@ +4.1-alpha2 + * Fix BulkLoader to load entireSSTableThrottle and entireSSTableInterDcThrottle (CASSANDRA-17677) + * Fix a race condition where a keyspace can be oopened while it is being removed (CASSANDRA-17658) + * DatabaseDescriptor will set the default failure detector during client initialization (CASSANDRA-17782) + * Avoid initializing schema via SystemKeyspace.getPreferredIP() with the BulkLoader tool (CASSANDRA-17740) + * Uncomment prepared_statements_cache_size, key_cache_size, counter_cache_size, index_summary_capacity which were + commented out by mistake in a previous patch + Fix breaking change with cache_load_timeout; cache_load_timeout_seconds <=0 and cache_load_timeout=0 are equivalent + and they both mean disabled + Deprecate public method setRate(final double throughputMbPerSec) in Compaction Manager in favor of + setRateInBytes(final double throughputBytesPerSec) + Revert breaking change removal of StressCQLSSTableWriter.Builder.withBufferSizeInMB(int size). Deprecate it in favor + of StressCQLSSTableWriter.Builder.withBufferSizeInMiB(int size) + Fix precision issues, add new -m flag (for nodetool/setstreamthroughput, nodetool/setinterdcstreamthroughput, + nodetool/getstreamthroughput and nodetoo/getinterdcstreamthroughput), add new -d flags (nodetool/getstreamthroughput, nodetool/getinterdcstreamthroughput, nodetool/getcompactionthroughput) + Fix a bug with precision in nodetool/compactionstats + Deprecate StorageService methods and add new ones for stream_throughput_outbound, inter_dc_stream_throughput_outbound, + compaction_throughput_outbound in the JMX MBean `org.apache.cassandra.db:type=StorageService` + Removed getEntireSSTableStreamThroughputMebibytesPerSec in favor of new getEntireSSTableStreamThroughputMebibytesPerSecAsDouble + in the JMX MBean `org.apache.cassandra.db:type=StorageService` + Removed getEntireSSTableInterDCStreamThroughputMebibytesPerSec in favor of getEntireSSTableInterDCStreamThroughputMebibytesPerSecAsDouble + in the JMX MBean `org.apache.cassandra.db:type=StorageService` (CASSANDRA-17725) + * Fix sstable_preemptive_open_interval disabled value. sstable_preemptive_open_interval = null backward compatible with + sstable_preemptive_open_interval_in_mb = -1 (CASSANDRA-17737) + * Remove usages of Path#toFile() in the snapshot apparatus (CASSANDRA-17769) + * Fix Settings Virtual Table to update paxos_variant after startup and rename enable_uuid_sstable_identifiers to + uuid_sstable_identifiers_enabled as per our config naming conventions (CASSANDRA-17738) + * index_summary_resize_interval_in_minutes = -1 is equivalent to index_summary_resize_interval being set to null or + disabled. JMX MBean IndexSummaryManager, setResizeIntervalInMinutes method still takes resizeIntervalInMinutes = -1 for disabled (CASSANDRA-17735) + * min_tracked_partition_size_bytes parameter from 4.1 alpha1 was renamed to min_tracked_partition_size (CASSANDRA-17733) + * Remove commons-lang dependency during build runtime (CASSANDRA-17724) + * Relax synchronization on StreamSession#onError() to avoid deadlock (CASSANDRA-17706) + * Fix AbstractCell#toString throws MarshalException for cell in collection (CASSANDRA-17695) + * Add new vtable output option to compactionstats (CASSANDRA-17683) + * Fix commitLogUpperBound initialization in AbstractMemtableWithCommitlog (CASSANDRA-17587) + * Fix widening to long in getBatchSizeFailThreshold (CASSANDRA-17650) + * Fix widening from mebibytes to bytes in IntMebibytesBound (CASSANDRA-17716) + * Revert breaking change in nodetool clientstats and expose cient options through nodetool clientstats --client-options. (CASSANDRA-17715) + * Fix missed nowInSec values in QueryProcessor (CASSANDRA-17458) + * Revert removal of withBufferSizeInMB(int size) in CQLSSTableWriter.Builder class and deprecate it in favor of withBufferSizeInMiB(int size) (CASSANDRA-17675) + * Remove expired snapshots of dropped tables after restart (CASSANDRA-17619) +Merged from 4.0: + 4.0.7 + * Avoid getting hanging repairs due to repair message timeouts (CASSANDRA-17613) - * Prevent infinite loop in repair coordinator on FailSession (CASSANDRA-17834) -Merged from 3.11: - * Fix potential IndexOutOfBoundsException in PagingState in mixed mode clusters (CASSANDRA-17840) -Merged from 3.0: - * Fix scrubber falling into infinite loop when the last partition is broken (CASSANDRA-17862) ++ + 4.0.6 + * Prevent infinite loop in repair coordinator on FailSession (CASSANDRA-17834) * Fix race condition on updating cdc size and advancing to next segment (CASSANDRA-17792) * Add 'noboolean' rpm build for older distros like CentOS7 (CASSANDRA-17765) * Fix default value for compaction_throughput_mb_per_sec in Config class to match the one in cassandra.yaml (CASSANDRA-17790) diff --cc src/java/org/apache/cassandra/config/Config.java index 1c5948dc73,f8d8d46db8..7fd9b05816 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@@ -123,41 -91,28 +123,42 @@@ public class Confi /** Triggers automatic allocation of tokens if set, based on the provided replica count for a datacenter */ public Integer allocate_tokens_for_local_replication_factor = null; - public long native_transport_idle_timeout_in_ms = 0L; + @Replaces(oldName = "native_transport_idle_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true) + public DurationSpec.LongMillisecondsBound native_transport_idle_timeout = new DurationSpec.LongMillisecondsBound("0ms"); + + @Replaces(oldName = "request_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true) + public volatile DurationSpec.LongMillisecondsBound request_timeout = new DurationSpec.LongMillisecondsBound("10000ms"); - public volatile long request_timeout_in_ms = 10000L; + @Replaces(oldName = "read_request_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true) + public volatile DurationSpec.LongMillisecondsBound read_request_timeout = new DurationSpec.LongMillisecondsBound("5000ms"); - public volatile long read_request_timeout_in_ms = 5000L; + @Replaces(oldName = "range_request_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true) + public volatile DurationSpec.LongMillisecondsBound range_request_timeout = new DurationSpec.LongMillisecondsBound("10000ms"); - public volatile long range_request_timeout_in_ms = 10000L; + @Replaces(oldName = "write_request_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true) + public volatile DurationSpec.LongMillisecondsBound write_request_timeout = new DurationSpec.LongMillisecondsBound("2000ms"); - public volatile long write_request_timeout_in_ms = 2000L; + @Replaces(oldName = "counter_write_request_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true) + public volatile DurationSpec.LongMillisecondsBound counter_write_request_timeout = new DurationSpec.LongMillisecondsBound("5000ms"); - public volatile long counter_write_request_timeout_in_ms = 5000L; + @Replaces(oldName = "cas_contention_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true) + public volatile DurationSpec.LongMillisecondsBound cas_contention_timeout = new DurationSpec.LongMillisecondsBound("1800ms"); - public volatile long cas_contention_timeout_in_ms = 1000L; + @Replaces(oldName = "truncate_request_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true) + public volatile DurationSpec.LongMillisecondsBound truncate_request_timeout = new DurationSpec.LongMillisecondsBound("60000ms"); - public volatile long truncate_request_timeout_in_ms = 60000L; ++ @Replaces(oldName = "repair_request_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true) + public volatile DurationSpec.LongMillisecondsBound repair_request_timeout = new DurationSpec.LongMillisecondsBound("120000ms"); public Integer streaming_connections_per_host = 1; - public Integer streaming_keep_alive_period_in_secs = 300; //5 minutes + @Replaces(oldName = "streaming_keep_alive_period_in_secs", converter = Converters.SECONDS_DURATION, deprecated = true) + public DurationSpec.IntSecondsBound streaming_keep_alive_period = new DurationSpec.IntSecondsBound("300s"); - public boolean cross_node_timeout = true; + @Replaces(oldName = "cross_node_timeout", converter = Converters.IDENTITY, deprecated = true) + public boolean internode_timeout = true; - public volatile long slow_query_log_timeout_in_ms = 500L; + @Replaces(oldName = "slow_query_log_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true) + public volatile DurationSpec.LongMillisecondsBound slow_query_log_timeout = new DurationSpec.LongMillisecondsBound("500ms"); public volatile double phi_convict_threshold = 8.0; diff --cc src/java/org/apache/cassandra/net/Verb.java index a562b37fb6,9d8b76dd35..d50a187fda --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@@ -158,22 -147,22 +158,22 @@@ public enum Ver SCHEMA_VERSION_REQ (20, P1, rpcTimeout, MIGRATION, () -> NoPayload.serializer, () -> SchemaVersionVerbHandler.instance, SCHEMA_VERSION_RSP ), // repair; mostly doesn't use callbacks and sends responses as their own request messages, with matching sessions by uuid; should eventually harmonize and make idiomatic - REPAIR_RSP (100, P1, repairMsgTimeout,REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), - VALIDATION_RSP (102, P1, longTimeout ,ANTI_ENTROPY, () -> ValidationResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - VALIDATION_REQ (101, P1, repairMsgTimeout,ANTI_ENTROPY, () -> ValidationRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - SYNC_RSP (104, P1, repairMsgTimeout,ANTI_ENTROPY, () -> SyncResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - SYNC_REQ (103, P1, repairMsgTimeout,ANTI_ENTROPY, () -> SyncRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - PREPARE_MSG (105, P1, repairMsgTimeout,ANTI_ENTROPY, () -> PrepareMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - SNAPSHOT_MSG (106, P1, repairMsgTimeout,ANTI_ENTROPY, () -> SnapshotMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - CLEANUP_MSG (107, P1, repairMsgTimeout,ANTI_ENTROPY, () -> CleanupMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - PREPARE_CONSISTENT_RSP (109, P1, repairMsgTimeout,ANTI_ENTROPY, () -> PrepareConsistentResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - PREPARE_CONSISTENT_REQ (108, P1, repairMsgTimeout,ANTI_ENTROPY, () -> PrepareConsistentRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - FINALIZE_PROPOSE_MSG (110, P1, repairMsgTimeout,ANTI_ENTROPY, () -> FinalizePropose.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - FINALIZE_PROMISE_MSG (111, P1, repairMsgTimeout,ANTI_ENTROPY, () -> FinalizePromise.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - FINALIZE_COMMIT_MSG (112, P1, repairMsgTimeout,ANTI_ENTROPY, () -> FinalizeCommit.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - FAILED_SESSION_MSG (113, P1, repairMsgTimeout,ANTI_ENTROPY, () -> FailSession.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - STATUS_RSP (115, P1, repairMsgTimeout,ANTI_ENTROPY, () -> StatusResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), - STATUS_REQ (114, P1, repairMsgTimeout,ANTI_ENTROPY, () -> StatusRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + REPAIR_RSP (100, P1, repairTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), - VALIDATION_RSP (102, P1, repairTimeout, ANTI_ENTROPY, () -> ValidationResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), ++ VALIDATION_RSP (102, P1, longTimeout, ANTI_ENTROPY, () -> ValidationResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + VALIDATION_REQ (101, P1, repairTimeout, ANTI_ENTROPY, () -> ValidationRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + SYNC_RSP (104, P1, repairTimeout, ANTI_ENTROPY, () -> SyncResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + SYNC_REQ (103, P1, repairTimeout, ANTI_ENTROPY, () -> SyncRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + PREPARE_MSG (105, P1, repairTimeout, ANTI_ENTROPY, () -> PrepareMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + SNAPSHOT_MSG (106, P1, repairTimeout, ANTI_ENTROPY, () -> SnapshotMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + CLEANUP_MSG (107, P1, repairTimeout, ANTI_ENTROPY, () -> CleanupMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + PREPARE_CONSISTENT_RSP (109, P1, repairTimeout, ANTI_ENTROPY, () -> PrepareConsistentResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + PREPARE_CONSISTENT_REQ (108, P1, repairTimeout, ANTI_ENTROPY, () -> PrepareConsistentRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + FINALIZE_PROPOSE_MSG (110, P1, repairTimeout, ANTI_ENTROPY, () -> FinalizePropose.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + FINALIZE_PROMISE_MSG (111, P1, repairTimeout, ANTI_ENTROPY, () -> FinalizePromise.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + FINALIZE_COMMIT_MSG (112, P1, repairTimeout, ANTI_ENTROPY, () -> FinalizeCommit.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + FAILED_SESSION_MSG (113, P1, repairTimeout, ANTI_ENTROPY, () -> FailSession.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + STATUS_RSP (115, P1, repairTimeout, ANTI_ENTROPY, () -> StatusResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + STATUS_REQ (114, P1, repairTimeout, ANTI_ENTROPY, () -> StatusRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), REPLICATION_DONE_RSP (82, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), REPLICATION_DONE_REQ (22, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP), diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index da93413668,7488f2ebc1..58612f778a --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@@ -33,8 -31,6 +33,7 @@@ import org.apache.cassandra.repair.stat import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.PreviewKind; - import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.TimeUUID; import static org.apache.cassandra.net.Verb.VALIDATION_RSP; @@@ -148,14 -124,17 +147,16 @@@ public class RepairMessageVerbHandler i break; case VALIDATION_REQ: + { + // notify initiator that the message has been received, allowing this method to take as long as it needs to + MessagingService.instance().send(message.emptyResponse(), message.from()); ValidationRequest validationRequest = (ValidationRequest) message.payload; logger.debug("Validating {}", validationRequest); - // trigger read-only compaction - ColumnFamilyStore store = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily); - if (store == null) + + ParticipateState participate = ActiveRepairService.instance.participate(desc.parentSessionId); + if (participate == null) { - logger.error("Table {}.{} was dropped during snapshot phase of repair {}", - desc.keyspace, desc.columnFamily, desc.parentSessionId); - MessagingService.instance().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from()); + logErrorAndSendFailureResponse("Unknown repair " + desc.parentSessionId, message); return; } @@@ -205,7 -145,8 +206,9 @@@ break; case SYNC_REQ: + { + // notify initiator that the message has been received, allowing this method to take as long as it needs to + MessagingService.instance().send(message.emptyResponse(), message.from()); // forwarded sync request SyncRequest request = (SyncRequest) message.payload; logger.debug("Syncing {}", request); diff --cc src/java/org/apache/cassandra/repair/StreamingRepairTask.java index 02697b6977,9c6caf46f5..ea50a50068 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@@ -37,9 -38,12 +37,13 @@@ import org.apache.cassandra.streaming.S import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.utils.TimeUUID; + import static java.util.concurrent.TimeUnit.MILLISECONDS; + import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.net.Verb.SYNC_RSP; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; ++import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; + /** * StreamingRepairTask performs data streaming between two remote replicas, neither of which is repair coordinator. diff --cc src/java/org/apache/cassandra/repair/SyncTask.java index e254d93883,24e206828d..b325eb42ad --- a/src/java/org/apache/cassandra/repair/SyncTask.java +++ b/src/java/org/apache/cassandra/repair/SyncTask.java @@@ -35,9 -37,9 +37,10 @@@ import org.apache.cassandra.repair.mess import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.Tracing; +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; + import static org.apache.cassandra.net.Verb.SYNC_REQ; -public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runnable +public abstract class SyncTask extends AsyncFuture<SyncStat> implements Runnable { private static final Logger logger = LoggerFactory.getLogger(SyncTask.class); @@@ -103,4 -105,12 +106,12 @@@ } public void abort() {} + + void sendRequest(SyncRequest request, InetAddressAndPort to) + { + RepairMessage.sendMessageWithFailureCB(request, + SYNC_REQ, + to, - this::setException); ++ this::tryFailure); + } } diff --cc src/java/org/apache/cassandra/repair/ValidationTask.java index 616a2d806b,b4aef249c5..2ad17612d8 --- a/src/java/org/apache/cassandra/repair/ValidationTask.java +++ b/src/java/org/apache/cassandra/repair/ValidationTask.java @@@ -56,8 -52,10 +55,10 @@@ public class ValidationTask extends Asy */ public void run() { - ValidationRequest request = new ValidationRequest(desc, nowInSec); - MessagingService.instance().send(Message.out(VALIDATION_REQ, request), endpoint); + RepairMessage.sendMessageWithFailureCB(new ValidationRequest(desc, nowInSec), + VALIDATION_REQ, + endpoint, - this::setException); ++ this::tryFailure); } /** diff --cc src/java/org/apache/cassandra/repair/messages/RepairMessage.java index 3137b4e474,165911dc9b..00ce888696 --- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java @@@ -17,7 -17,23 +17,23 @@@ */ package org.apache.cassandra.repair.messages; -import java.util.UUID; - + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import org.apache.cassandra.exceptions.RepairException; + import org.apache.cassandra.exceptions.RequestFailureReason; + import org.apache.cassandra.gms.Gossiper; + import org.apache.cassandra.locator.InetAddressAndPort; + import org.apache.cassandra.net.Message; + import org.apache.cassandra.net.MessagingService; + import org.apache.cassandra.net.RequestCallback; + import org.apache.cassandra.net.Verb; import org.apache.cassandra.repair.RepairJobDesc; ++import org.apache.cassandra.streaming.PreviewKind; + import org.apache.cassandra.utils.CassandraVersion; ++import org.apache.cassandra.utils.TimeUUID; + + import static org.apache.cassandra.net.MessageFlag.CALL_BACK_ON_FAILURE; /** * Base class of all repair related request/response messages. @@@ -32,4 -50,49 +50,49 @@@ public abstract class RepairMessag { this.desc = desc; } + + public interface RepairFailureCallback + { + void onFailure(Exception e); + } + + public static void sendMessageWithFailureCB(RepairMessage request, Verb verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback) + { + RequestCallback<?> callback = new RequestCallback<Object>() + { + @Override + public void onResponse(Message<Object> msg) + { + logger.info("[#{}] {} received by {}", request.desc.parentSessionId, verb, endpoint); + // todo: at some point we should make repair messages follow the normal path, actually using this + } + + @Override + public boolean invokeOnFailure() + { + return true; + } + + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) + { + logger.error("[#{}] {} failed on {}: {}", request.desc.parentSessionId, verb, from, failureReason); + + if (supportsTimeouts(from, request.desc.parentSessionId)) - failureCallback.onFailure(new RepairException(request.desc, String.format("Got %s failure from %s: %s", verb, from, failureReason))); ++ failureCallback.onFailure(RepairException.error(request.desc, PreviewKind.NONE, String.format("Got %s failure from %s: %s", verb, from, failureReason))); + } + }; + + MessagingService.instance().sendWithCallback(Message.outWithFlag(verb, request, CALL_BACK_ON_FAILURE), + endpoint, + callback); + } + - private static boolean supportsTimeouts(InetAddressAndPort from, UUID parentSessionId) ++ private static boolean supportsTimeouts(InetAddressAndPort from, TimeUUID parentSessionId) + { + CassandraVersion remoteVersion = Gossiper.instance.getReleaseVersion(from); + if (remoteVersion != null && remoteVersion.compareTo(SUPPORTS_TIMEOUTS) >= 0) + return true; + logger.warn("[#{}] Not failing repair due to remote host {} not supporting repair message timeouts (version = {})", parentSessionId, from, remoteVersion); + return false; + } } diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 745067d6cd,4a990f662c..68247eaf58 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -108,29 -87,15 +108,27 @@@ import org.apache.cassandra.repair.mess import org.apache.cassandra.repair.messages.SyncResponse; import org.apache.cassandra.repair.messages.ValidationResponse; import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.FutureCombiner; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; - import org.checkerframework.checker.nullness.qual.Nullable; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.transform; +import static java.util.Collections.synchronizedSet; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; +import static org.apache.cassandra.config.Config.RepairCommandPoolFullStrategy.reject; +import static org.apache.cassandra.config.DatabaseDescriptor.*; +import static org.apache.cassandra.net.Message.out; import static org.apache.cassandra.net.Verb.PREPARE_MSG; +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; +import static org.apache.cassandra.utils.Simulate.With.MONITORS; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; - import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; +import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch; /** * ActiveRepairService is the starting point for manual "active" repairs. diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java index fc058dbfe1,fc058dbfe1..f1266fafaf --- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java @@@ -22,6 -22,6 +22,7 @@@ import java.io.IOException import java.util.ArrayList; import java.util.Collection; import java.util.List; ++import java.util.concurrent.RejectedExecutionException; import org.junit.AfterClass; import org.junit.BeforeClass; @@@ -78,6 -78,6 +79,8 @@@ public class RepairCoordinatorBase exte .withConfig(c -> c.with(Feature.NETWORK) .with(Feature.GOSSIP)) .start()); ++ ++ CLUSTER.setUncaughtExceptionsFilter(throwable -> throwable instanceof RejectedExecutionException && "RepairJobTask has shut down".equals(throwable.getMessage())); } @AfterClass diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/RepairRequestTimeoutUpgradeTest.java index 0000000000,0000000000..9ef4fc0c83 new file mode 100644 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/RepairRequestTimeoutUpgradeTest.java @@@ -1,0 -1,0 +1,58 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.cassandra.distributed.upgrade; ++ ++import org.junit.Test; ++ ++import com.vdurmont.semver4j.Semver; ++import org.apache.cassandra.distributed.api.ConsistencyLevel; ++import org.apache.cassandra.distributed.api.Feature; ++ ++import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; ++import static org.apache.cassandra.distributed.shared.AssertUtils.row; ++import static org.apache.cassandra.net.Verb.SYNC_REQ; ++import static org.apache.cassandra.net.Verb.VALIDATION_REQ; ++ ++public class RepairRequestTimeoutUpgradeTest extends UpgradeTestBase ++{ ++ @Test ++ public void simpleUpgradeWithNetworkAndGossipTest() throws Throwable ++ { ++ new TestCase() ++ .nodes(2) ++ .nodesToUpgrade(1) ++ .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP).set("repair_request_timeout_in_ms", 1000)) ++ .upgrades(v40, v41) ++ .setup((cluster) -> { ++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); ++ for (int i = 0; i < 10; i++) ++ cluster.get(i % 2 + 1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES ("+i+", 1, 1)"); ++ cluster.forEach(i -> i.flush(KEYSPACE)); ++ }) ++ .runAfterNodeUpgrade((cluster, node) -> { ++ cluster.filters().verbs(VALIDATION_REQ.id).drop(); ++ cluster.get(2).nodetoolResult("repair", KEYSPACE, "-full").asserts().failure(); ++ cluster.filters().reset(); ++ for (int i = 10; i < 20; i++) ++ cluster.get(i % 2 + 1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES ("+i+", 1, 1)"); ++ cluster.forEach(i -> i.flush(KEYSPACE)); ++ cluster.get(1).nodetoolResult("repair", KEYSPACE, "-full").asserts().success(); ++ }).run(); ++ } ++} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
