Revert CASSANDRA-6132 (pushing to 2.0 only)
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cf38e9e6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cf38e9e6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cf38e9e6 Branch: refs/heads/cassandra-2.0 Commit: cf38e9e6f96e692909e7669c053e372a638605e7 Parents: 731e83b Author: Jonathan Ellis <jbel...@apache.org> Authored: Fri Oct 4 16:31:00 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Fri Oct 4 16:31:29 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 - .../cassandra/config/DatabaseDescriptor.java | 3 -- .../org/apache/cassandra/dht/BootStrapper.java | 8 ++--- .../org/apache/cassandra/net/CallbackInfo.java | 17 +++++++-- .../apache/cassandra/net/MessagingService.java | 28 +++++++-------- .../apache/cassandra/net/WriteCallbackInfo.java | 26 -------------- .../apache/cassandra/service/StorageProxy.java | 38 +++++--------------- 7 files changed, 40 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c1d1991..3dc5e77 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,4 @@ 1.2.11 - * Never return WriteTimeout for CL.ANY (CASSANDRA-6032) * Tracing should log write failure rather than raw exceptions (CASSANDRA-6133) * lock access to TM.endpointToHostIdMap (CASSANDRA-6103) * Allow estimated memtable size to exceed slab allocator size (CASSANDRA-6078) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 218f719..633ea9a 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -38,7 +38,6 @@ import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DefsTable; import org.apache.cassandra.db.SystemTable; -import org.apache.cassandra.dht.BootStrapper; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.FSWriteError; @@ -840,8 +839,6 @@ public class DatabaseDescriptor case READ_REPAIR: case MUTATION: return getWriteRpcTimeout(); - case BOOTSTRAP_TOKEN: - return BootStrapper.BOOTSTRAP_TIMEOUT; default: return getRpcTimeout(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index 2e79562..ff76534 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -48,8 +48,6 @@ import org.apache.cassandra.net.*; public class BootStrapper { - public static final long BOOTSTRAP_TIMEOUT = 30000; // default bootstrap timeout of 30s - private static final Logger logger = LoggerFactory.getLogger(BootStrapper.class); /* endpoint that needs to be bootstrapped */ @@ -57,6 +55,7 @@ public class BootStrapper /* token of the node being bootstrapped. */ protected final Collection<Token> tokens; protected final TokenMetadata tokenMetadata; + private static final long BOOTSTRAP_TIMEOUT = 30000; // default bootstrap timeout of 30s public BootStrapper(InetAddress address, Collection<Token> tokens, TokenMetadata tmd) { @@ -188,12 +187,13 @@ public class BootStrapper { MessageOut message = new MessageOut(MessagingService.Verb.BOOTSTRAP_TOKEN); int retries = 5; + long timeout = Math.max(DatabaseDescriptor.getRpcTimeout(), BOOTSTRAP_TIMEOUT); while (retries > 0) { BootstrapTokenCallback btc = new BootstrapTokenCallback(); - MessagingService.instance().sendRR(message, maxEndpoint, btc); - Token token = btc.getToken(BOOTSTRAP_TIMEOUT); + MessagingService.instance().sendRR(message, maxEndpoint, btc, timeout); + Token token = btc.getToken(timeout); if (token != null) return token; http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/net/CallbackInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java index f90df8d..f0e48e9 100644 --- a/src/java/org/apache/cassandra/net/CallbackInfo.java +++ b/src/java/org/apache/cassandra/net/CallbackInfo.java @@ -31,6 +31,7 @@ public class CallbackInfo { protected final InetAddress target; protected final IMessageCallback callback; + protected final MessageOut<?> sentMessage; protected final IVersionedSerializer<?> serializer; /** @@ -40,15 +41,27 @@ public class CallbackInfo * @param callback * @param serializer serializer to deserialize response message */ - public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer) + public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer) + { + this(target, callback, null, serializer); + } + + public CallbackInfo(InetAddress target, IMessageCallback callback, MessageOut<?> sentMessage, IVersionedSerializer<?> serializer) { this.target = target; this.callback = callback; + this.sentMessage = sentMessage; this.serializer = serializer; } + /** + * @return TRUE iff a hint should be written for this target. + * + * NOTE: + * Assumes it is only called after the write of "sentMessage" to "target" has timed out. + */ public boolean shouldHint() { - return false; + return sentMessage != null && StorageProxy.shouldHint(target); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index c9b0047..a199e83 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -329,7 +329,8 @@ public final class MessagingService implements MessagingServiceMBean if (expiredCallbackInfo.shouldHint()) { - RowMutation rm = (RowMutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload; + assert expiredCallbackInfo.sentMessage != null; + RowMutation rm = (RowMutation) expiredCallbackInfo.sentMessage.payload; return StorageProxy.submitHint(rm, expiredCallbackInfo.target, null, null); } @@ -521,18 +522,15 @@ public final class MessagingService implements MessagingServiceMBean public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout) { - assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel String messageId = nextId(); - CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout); - assert previous == null; - return messageId; - } + CallbackInfo previous; + + // If HH is enabled and this is a mutation message => store the message to track for potential hints. + if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION) + previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout); + else + previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout); - public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel) - { - assert message.verb == Verb.MUTATION; - String messageId = nextId(); - CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout); assert previous == null; return messageId; } @@ -550,7 +548,7 @@ public final class MessagingService implements MessagingServiceMBean */ public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb) { - return sendRR(message, to, cb, message.getTimeout(), null); + return sendRR(message, to, cb, message.getTimeout()); } /** @@ -567,11 +565,9 @@ public final class MessagingService implements MessagingServiceMBean * @param timeout the timeout used for expiration * @return an reference to message id used to match with the result */ - public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout, ConsistencyLevel consistencyLevel) + public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout) { - String id = consistencyLevel == null - ? addCallback(cb, message, to, timeout) - : addCallback(cb, message, to, timeout, consistencyLevel); + String id = addCallback(cb, message, to, timeout); if (cb instanceof AbstractWriteResponseHandler) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/net/WriteCallbackInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java deleted file mode 100644 index 8badbcf..0000000 --- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.apache.cassandra.net; - -import java.net.InetAddress; - -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.service.StorageProxy; - -public class WriteCallbackInfo extends CallbackInfo -{ - public final MessageOut sentMessage; - private final ConsistencyLevel consistencyLevel; - - public WriteCallbackInfo(InetAddress target, IMessageCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel) - { - super(target, callback, serializer); - assert message != null; - this.sentMessage = message; - this.consistencyLevel = consistencyLevel; - } - - public boolean shouldHint() - { - return consistencyLevel != ConsistencyLevel.ANY && StorageProxy.shouldHint(target); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index c0b7271..8a6e52e 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -196,34 +196,14 @@ public class StorageProxy implements StorageProxyMBean { responseHandler.get(); } + } catch (WriteTimeoutException ex) { - if (consistency_level == ConsistencyLevel.ANY) - { - // hint all the mutations (except counters, which can't be safely retried). This means - // we'll re-hint any successful ones; doesn't seem worth it to track individual success - // just for this unusual case. - for (IMutation mutation : mutations) - { - if (mutation instanceof CounterMutation) - continue; - - Token tk = StorageService.getPartitioner().getToken(mutation.key()); - List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getTable(), tk); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getTable()); - for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints)) - submitHint((RowMutation) mutation, target, null, consistency_level); - } - Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write"); - } - else - { - writeMetrics.timeouts.mark(); - ClientRequestMetrics.writeTimeouts.inc(); - Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); - throw ex; - } + writeMetrics.timeouts.mark(); + ClientRequestMetrics.writeTimeouts.inc(); + Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); + throw ex; } catch (UnavailableException e) { @@ -642,11 +622,11 @@ public class StorageProxy implements StorageProxyMBean { // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid // creating a second iterator since we already have a perfectly good one - MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel); + MessagingService.instance().sendRR(message, target, handler); while (iter.hasNext()) { target = iter.next(); - MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel); + MessagingService.instance().sendRR(message, target, handler); } return; } @@ -659,13 +639,13 @@ public class StorageProxy implements StorageProxyMBean { InetAddress destination = iter.next(); CompactEndpointSerializationHelper.serialize(destination, dos); - String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout(), handler.consistencyLevel); + String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout()); dos.writeUTF(id); } message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray()); // send the combined message + forward headers Tracing.trace("Enqueuing message to {}", target); - MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel); + MessagingService.instance().sendRR(message, target, handler); } private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)