On Fri, Oct 15, 2010 at 2:21 PM, Wayne <wav...@gmail.com> wrote: > The optimization definitely shaved off some time. Now it is running about 3x > CFSTATS reported time. Below are the logs. > > There is a ~300ms time frame after the last ResponseVerbHandler prior to the > resolver starting. Based on a quorum read the response resolver should kick > after 2 reads come in correct? That would mean it waited 500ms before > starting. Where is this time going?
It's going to deserialize the replies to see if it has both the data and enough digests to call resolve(). Then resolve deserializes them a 2nd time, so there's an easy win there caching the first deserialize, in the patch attached. (Applies on top of the previous one, which has been committed to the 0.6 svn branch at https://svn.apache.org/repos/asf/cassandra/branches/cassandra-0.6.) > There is still the 3+ second delay between the last 2 entries. Is this the > thrift server? It's converting the reply from Cassandra's internal representation to thrift, and sending it to the client. I suspect most of the time is the actual sending/waiting for the client to read part. Patch 2 also includes a debug statement after the convert-to-thrift stage to verify. -- Jonathan Ellis Project Chair, Apache Cassandra co-founder of Riptano, the source for professional Cassandra support http://riptano.com
Index: src/java/org/apache/cassandra/thrift/CassandraServer.java =================================================================== --- src/java/org/apache/cassandra/thrift/CassandraServer.java (revision 1022989) +++ src/java/org/apache/cassandra/thrift/CassandraServer.java (working copy) @@ -187,6 +187,9 @@ columnFamiliesMap.put(command.key, thriftifiedColumns); } + if (logger.isDebugEnabled()) + logger.debug("Slice converted to thrift; sending to client"); + return columnFamiliesMap; } Index: src/java/org/apache/cassandra/service/ReadResponseResolver.java =================================================================== --- src/java/org/apache/cassandra/service/ReadResponseResolver.java (revision 1023089) +++ src/java/org/apache/cassandra/service/ReadResponseResolver.java (working copy) @@ -22,10 +22,7 @@ import java.io.DataInputStream; import java.io.IOError; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; +import java.util.*; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ReadResponse; @@ -49,6 +46,7 @@ private static Logger logger_ = Logger.getLogger(ReadResponseResolver.class); private final String table; private final int responseCount; + private final HashMap<Message, ReadResponse> results; public ReadResponseResolver(String table, int responseCount) { @@ -56,6 +54,7 @@ : "invalid response count " + responseCount; this.responseCount = responseCount; + results = new HashMap<Message, ReadResponse>(responseCount); this.table = table; } @@ -84,11 +83,10 @@ * query exists then we need to compare the digest with * the digest of the data that is received. */ - for (Message response : responses) - { - byte[] body = response.getMessageBody(); - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); - ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); + for (Message message : responses) + { + assert results.containsKey(message); // should be deserialized by isDataPresent + ReadResponse result = getResult(message); if (result.isDigestQuery()) { digest = result.digest(); @@ -97,14 +95,11 @@ else { versions.add(result.row().cf); - endPoints.add(response.getFrom()); + endPoints.add(message.getFrom()); key = result.row().key; } } - if (logger_.isDebugEnabled()) - logger_.debug("responses deserialized"); - // If there was a digest query compare it with all the data digests // If there is a mismatch then throw an exception so that read repair can happen. if (isDigestQuery) @@ -195,25 +190,32 @@ if (responses.size() < responseCount) return false; - boolean isDataPresent = false; - for (Message response : responses) + for (Message message : responses) { - byte[] body = response.getMessageBody(); - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); - try - { - ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); - if (!result.isDigestQuery()) - { - isDataPresent = true; - } - bufIn.close(); - } - catch (IOException ex) - { - throw new RuntimeException(ex); - } + if (!getResult(message).isDigestQuery()) + return true; } - return isDataPresent; + + return false; } + + private ReadResponse getResult(Message message) + { + ReadResponse result = results.get(message); + if (result != null) + return result; + + byte[] body = message.getMessageBody(); + ByteArrayInputStream bufIn = new ByteArrayInputStream(body); + try + { + result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); + results.put(message, result); + return result; + } + catch (IOException e) + { + throw new IOError(e); + } + } }