On Fri, Oct 15, 2010 at 2:21 PM, Wayne <wav...@gmail.com> wrote:
> The optimization definitely shaved off some time. Now it is running about 3x
> CFSTATS reported time. Below are the logs.
>
> There is a ~300ms time frame after the last ResponseVerbHandler prior to the
> resolver starting. Based on a quorum read the response resolver should kick
> after 2 reads come in correct? That would mean it waited 500ms before
> starting. Where is this time going?

It's going to deserialize the replies to see if it has both the data
and enough digests to call resolve().  Then resolve deserializes them
a 2nd time, so there's an easy win there caching the first
deserialize, in the patch attached.  (Applies on top of the previous
one, which has been committed to the 0.6 svn branch at
https://svn.apache.org/repos/asf/cassandra/branches/cassandra-0.6.)

> There is still the 3+ second delay between the last 2 entries. Is this the
> thrift server?

It's converting the reply from Cassandra's internal representation to
thrift, and sending it to the client.  I suspect most of the time is
the actual sending/waiting for the client to read part.  Patch 2 also
includes a debug statement after the convert-to-thrift stage to
verify.

-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of Riptano, the source for professional Cassandra support
http://riptano.com
Index: src/java/org/apache/cassandra/thrift/CassandraServer.java
===================================================================
--- src/java/org/apache/cassandra/thrift/CassandraServer.java   (revision 
1022989)
+++ src/java/org/apache/cassandra/thrift/CassandraServer.java   (working copy)
@@ -187,6 +187,9 @@
             columnFamiliesMap.put(command.key, thriftifiedColumns);
         }
 
+        if (logger.isDebugEnabled())
+            logger.debug("Slice converted to thrift; sending to client");
+
         return columnFamiliesMap;
     }
 
Index: src/java/org/apache/cassandra/service/ReadResponseResolver.java
===================================================================
--- src/java/org/apache/cassandra/service/ReadResponseResolver.java     
(revision 1023089)
+++ src/java/org/apache/cassandra/service/ReadResponseResolver.java     
(working copy)
@@ -22,10 +22,7 @@
 import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ReadResponse;
@@ -49,6 +46,7 @@
        private static Logger logger_ = 
Logger.getLogger(ReadResponseResolver.class);
     private final String table;
     private final int responseCount;
+    private final HashMap<Message, ReadResponse> results;
 
     public ReadResponseResolver(String table, int responseCount)
     {
@@ -56,6 +54,7 @@
             : "invalid response count " + responseCount;
 
         this.responseCount = responseCount;
+        results = new HashMap<Message, ReadResponse>(responseCount);
         this.table = table;
     }
 
@@ -84,11 +83,10 @@
          * query exists then we need to compare the digest with 
          * the digest of the data that is received.
         */
-               for (Message response : responses)
-               {                                                   
-            byte[] body = response.getMessageBody();
-            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-            ReadResponse result = ReadResponse.serializer().deserialize(new 
DataInputStream(bufIn));
+               for (Message message : responses)
+               {
+            assert results.containsKey(message); // should be deserialized by 
isDataPresent
+            ReadResponse result = getResult(message);
             if (result.isDigestQuery())
             {
                 digest = result.digest();
@@ -97,14 +95,11 @@
             else
             {
                 versions.add(result.row().cf);
-                endPoints.add(response.getFrom());
+                endPoints.add(message.getFrom());
                 key = result.row().key;
             }
         }
 
-        if (logger_.isDebugEnabled())
-            logger_.debug("responses deserialized");
-
                // If there was a digest query compare it with all the data 
digests
                // If there is a mismatch then throw an exception so that read 
repair can happen.
         if (isDigestQuery)
@@ -195,25 +190,32 @@
         if (responses.size() < responseCount)
             return false;
 
-        boolean isDataPresent = false;
-        for (Message response : responses)
+        for (Message message : responses)
         {
-            byte[] body = response.getMessageBody();
-            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-            try
-            {
-                ReadResponse result = 
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                if (!result.isDigestQuery())
-                {
-                    isDataPresent = true;
-                }
-                bufIn.close();
-            }
-            catch (IOException ex)
-            {
-                throw new RuntimeException(ex);
-            }
+            if (!getResult(message).isDigestQuery())
+                return true;
         }
-        return isDataPresent;
+
+        return false;
     }
+
+    private ReadResponse getResult(Message message)
+    {
+        ReadResponse result = results.get(message);
+        if (result != null)
+            return result;
+
+        byte[] body = message.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        try
+        {
+            result = ReadResponse.serializer().deserialize(new 
DataInputStream(bufIn));
+            results.put(message, result);
+            return result;
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
 }

Reply via email to