v3 attached to fix assertion failure.

Those last seconds after "sending to client" aren't fixable except
perhaps by using a faster client language.  That is, the client is
probably doing read-and-deserialize as one step rather than 2, so
being slow in the deserialize part means the server is sitting there
waiting to send the rest of a large response.  But while that's
happening it's using negligible resources on the server side so it's
not the end of the world for a multithreaded app.

On Mon, Oct 18, 2010 at 2:37 PM, Wayne <wav...@gmail.com> wrote:
> It is much faster again, but the last 3 seconds still linger. There a lot
> more logout complete messages getting logged all of the time, but the one
> below matches in time to the time the client registers it takes to get the
> data back. It also produces errors sometimes...the error example is listed
> below.
>
> DEBUG [pool-1-thread-64] 2010-10-18 19:25:28,866 CassandraServer.java (line
> 219) get_slice
> DEBUG [pool-1-thread-64] 2010-10-18 19:25:28,867 StorageProxy.java (line
> 471) strongread reading data for SliceFromReadCommand(table='table',
> key='key1', column_parent='QueryPath(columnFamilyName='fact',
> superColumnName='null', columnName='null')', start='503a', finish='503a7c',
> reversed=false, count=10000000) from 698@/x.x.x.6
> DEBUG [pool-1-thread-64] 2010-10-18 19:25:28,867 StorageProxy.java (line
> 471) strongread reading digest for SliceFromReadCommand(table='table',
> key='key1', column_parent='QueryPath(columnFamilyName='fact',
> superColumnName='null', columnName='null')', start='503a', finish='503a7c',
> reversed=false, count=10000000) from 699@/x.x.x.7
> DEBUG [pool-1-thread-64] 2010-10-18 19:25:28,867 StorageProxy.java (line
> 471) strongread reading digest for SliceFromReadCommand(table='table',
> key='key1', column_parent='QueryPath(columnFamilyName='fact',
> superColumnName='null', columnName='null')', start='503a', finish='503a7c',
> reversed=false, count=10000000) from 699@/x.x.x.8
> DEBUG [pool-1-thread-63] 2010-10-18 19:25:29,410 CassandraServer.java (line
> 667) logout complete
> DEBUG [RESPONSE-STAGE:5] 2010-10-18 19:25:30,864 ResponseVerbHandler.java
> (line 42) Processing response on a callback from
> 0952ED39-07CE-7971-8F06-0D611FCB5F34@/x.x.x.6
> DEBUG [RESPONSE-STAGE:6] 2010-10-18 19:25:31,449 ResponseVerbHandler.java
> (line 42) Processing response on a callback from
> 0952ED39-07CE-7971-8F06-0D611FCB5F34@/x.x.x.8
> DEBUG [pool-1-thread-64] 2010-10-18 19:25:31,449 ReadResponseResolver.java
> (line 71) resolving 2 responses
> DEBUG [pool-1-thread-64] 2010-10-18 19:25:31,608 ReadResponseResolver.java
> (line 116) digests verified
> DEBUG [pool-1-thread-64] 2010-10-18 19:25:31,609 ReadResponseResolver.java
> (line 133) resolve: 160 ms.
> DEBUG [pool-1-thread-64] 2010-10-18 19:25:31,609 StorageProxy.java (line
> 494) quorumResponseHandler: 2742 ms.
> DEBUG [pool-1-thread-64] 2010-10-18 19:25:31,667 CassandraServer.java (line
> 191) Slice converted to thrift; sending to client
> DEBUG [pool-1-thread-63] 2010-10-18 19:25:32,044 CassandraServer.java (line
> 667) logout complete
> DEBUG [pool-1-thread-63] 2010-10-18 19:25:32,238 CassandraServer.java (line
> 667) logout complete
> DEBUG [pool-1-thread-63] 2010-10-18 19:25:32,377 CassandraServer.java (line
> 667) logout complete
> DEBUG [pool-1-thread-63] 2010-10-18 19:25:33,890 CassandraServer.java (line
> 667) logout complete
> DEBUG [pool-1-thread-63] 2010-10-18 19:25:34,401 CassandraServer.java (line
> 667) logout complete
> DEBUG [pool-1-thread-63] 2010-10-18 19:25:34,538 CassandraServer.java (line
> 667) logout complete
> DEBUG [pool-1-thread-64] 2010-10-18 19:25:34,925 CassandraServer.java (line
> 667) logout complete
>
>
>
> Error:
>
> DEBUG [RESPONSE-STAGE:4] 2010-10-18 19:22:48,355 ResponseVerbHandler.java
> (line 42) Processing response on a callback from
> DDF1D615-BF1E-2CD1-34C6-96FA47D63259@/x.x.x.8
> ERROR [RESPONSE-STAGE:4] 2010-10-18 19:22:48,356 CassandraDaemon.java (line
> 87) Uncaught exception in thread Thread[RESPONSE-STAGE:4,5,main]
> java.lang.AssertionError
>     at
> org.apache.cassandra.service.ReadResponseResolver.getResult(ReadResponseResolver.java:218)
>     at
> org.apache.cassandra.service.ReadResponseResolver.isDataPresent(ReadResponseResolver.java:209)
>     at
> org.apache.cassandra.service.QuorumResponseHandler.response(QuorumResponseHandler.java:93)
>     at
> org.apache.cassandra.net.ResponseVerbHandler.doVerb(ResponseVerbHandler.java:44)
>     at
> org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:49)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>     at java.lang.Thread.run(Thread.java:619)
> DEBUG [pool-1-thread-33] 2010-10-18 19:22:48,380 ReadResponseResolver.java
> (line 71) resolving 2 responses
> DEBUG [pool-1-thread-33] 2010-10-18 19:22:48,542 ReadResponseResolver.java
> (line 116) digests verified
> DEBUG [pool-1-thread-33] 2010-10-18 19:22:48,542 ReadResponseResolver.java
> (line 133) resolve: 162 ms.
> DEBUG [pool-1-thread-33] 2010-10-18 19:22:48,542 StorageProxy.java (line
> 494) quorumResponseHandler: 2688 ms.
>
>
> On Sat, Oct 16, 2010 at 9:18 PM, Jonathan Ellis <jbel...@gmail.com> wrote:
>>
>> Thanks.  Take 2 attached.
>>
>> On Sat, Oct 16, 2010 at 3:37 PM, Wayne <wav...@gmail.com> wrote:
>> > ERROR [pool-1-thread-64] 2010-10-16 20:27:55,396 Cassandra.java (line
>> > 1280)
>> > Internal error processing get_slice
>> > java.lang.AssertionError
>> >     at
>> >
>> > org.apache.cassandra.service.ReadResponseResolver.resolve(ReadResponseResolver.java:88)
>> >     at
>> >
>> > org.apache.cassandra.service.ReadResponseResolver.resolve(ReadResponseResolver.java:44)
>> >     at
>> >
>> > org.apache.cassandra.service.QuorumResponseHandler.get(QuorumResponseHandler.java:86)
>> >     at
>> >
>> > org.apache.cassandra.service.StorageProxy.strongRead(StorageProxy.java:489)
>> >     at
>> >
>> > org.apache.cassandra.service.StorageProxy.readProtocol(StorageProxy.java:353)
>> >     at
>> >
>> > org.apache.cassandra.thrift.CassandraServer.readColumnFamily(CassandraServer.java:97)
>> >     at
>> >
>> > org.apache.cassandra.thrift.CassandraServer.getSlice(CassandraServer.java:180)
>> >     at
>> >
>> > org.apache.cassandra.thrift.CassandraServer.multigetSliceInternal(CassandraServer.java:262)
>> >     at
>> >
>> > org.apache.cassandra.thrift.CassandraServer.get_slice(CassandraServer.java:223)
>> >     at
>> >
>> > org.apache.cassandra.thrift.Cassandra$Processor$get_slice.process(Cassandra.java:1272)
>> >     at
>> >
>> > org.apache.cassandra.thrift.Cassandra$Processor.process(Cassandra.java:1166)
>> >     at
>> >
>> > org.apache.cassandra.thrift.CustomTThreadPoolServer$WorkerProcess.run(CustomTThreadPoolServer.java:167)
>> >     at
>> >
>> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >     at
>> >
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >     at java.lang.Thread.run(Thread.java:619)
>> >
>> >
>> > On Sat, Oct 16, 2010 at 9:52 AM, Jonathan Ellis <jbel...@gmail.com>
>> > wrote:
>> >>
>> >> Stack trace from cassandra log?
>> >>
>> >> On Sat, Oct 16, 2010 at 6:50 AM, Wayne <wav...@gmail.com> wrote:
>> >> > While doing a read I get a TApplicationException: Internal Error
>> >> > processing
>> >> > get_slice.
>> >> >
>> >> > On Fri, Oct 15, 2010 at 5:49 PM, Jonathan Ellis <jbel...@gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> On Fri, Oct 15, 2010 at 2:21 PM, Wayne <wav...@gmail.com> wrote:
>> >> >> > The optimization definitely shaved off some time. Now it is
>> >> >> > running
>> >> >> > about 3x
>> >> >> > CFSTATS reported time. Below are the logs.
>> >> >> >
>> >> >> > There is a ~300ms time frame after the last ResponseVerbHandler
>> >> >> > prior
>> >> >> > to
>> >> >> > the
>> >> >> > resolver starting. Based on a quorum read the response resolver
>> >> >> > should
>> >> >> > kick
>> >> >> > after 2 reads come in correct? That would mean it waited 500ms
>> >> >> > before
>> >> >> > starting. Where is this time going?
>> >> >>
>> >> >> It's going to deserialize the replies to see if it has both the data
>> >> >> and enough digests to call resolve().  Then resolve deserializes
>> >> >> them
>> >> >> a 2nd time, so there's an easy win there caching the first
>> >> >> deserialize, in the patch attached.  (Applies on top of the previous
>> >> >> one, which has been committed to the 0.6 svn branch at
>> >> >> https://svn.apache.org/repos/asf/cassandra/branches/cassandra-0.6.)
>> >> >>
>> >> >> > There is still the 3+ second delay between the last 2 entries. Is
>> >> >> > this
>> >> >> > the
>> >> >> > thrift server?
>> >> >>
>> >> >> It's converting the reply from Cassandra's internal representation
>> >> >> to
>> >> >> thrift, and sending it to the client.  I suspect most of the time is
>> >> >> the actual sending/waiting for the client to read part.  Patch 2
>> >> >> also
>> >> >> includes a debug statement after the convert-to-thrift stage to
>> >> >> verify.
>> >> >>
>> >> >> --
>> >> >> Jonathan Ellis
>> >> >> Project Chair, Apache Cassandra
>> >> >> co-founder of Riptano, the source for professional Cassandra support
>> >> >> http://riptano.com
>> >> >
>> >> >
>> >>
>> >>
>> >>
>> >> --
>> >> Jonathan Ellis
>> >> Project Chair, Apache Cassandra
>> >> co-founder of Riptano, the source for professional Cassandra support
>> >> http://riptano.com
>> >
>> >
>>
>>
>>
>> --
>> Jonathan Ellis
>> Project Chair, Apache Cassandra
>> co-founder of Riptano, the source for professional Cassandra support
>> http://riptano.com
>
>



-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of Riptano, the source for professional Cassandra support
http://riptano.com
Index: src/java/org/apache/cassandra/thrift/CassandraServer.java
===================================================================
--- src/java/org/apache/cassandra/thrift/CassandraServer.java   (revision 
1022989)
+++ src/java/org/apache/cassandra/thrift/CassandraServer.java   (working copy)
@@ -187,6 +187,9 @@
             columnFamiliesMap.put(command.key, thriftifiedColumns);
         }
 
+        if (logger.isDebugEnabled())
+            logger.debug("Slice converted to thrift; sending to client");
+
         return columnFamiliesMap;
     }
 
Index: src/java/org/apache/cassandra/service/QuorumResponseHandler.java
===================================================================
--- src/java/org/apache/cassandra/service/QuorumResponseHandler.java    
(revision 1022989)
+++ src/java/org/apache/cassandra/service/QuorumResponseHandler.java    
(working copy)
@@ -89,6 +89,7 @@
     public void response(Message message)
     {
         responses.add(message);
+        responseResolver.preprocess(message);
         if (responseResolver.isDataPresent(responses))
         {
             condition.signal();
Index: src/java/org/apache/cassandra/service/ReadResponseResolver.java
===================================================================
--- src/java/org/apache/cassandra/service/ReadResponseResolver.java     
(revision 1023089)
+++ src/java/org/apache/cassandra/service/ReadResponseResolver.java     
(working copy)
@@ -22,10 +22,7 @@
 import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ReadResponse;
@@ -37,6 +34,7 @@
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import org.apache.log4j.Logger;
 
@@ -49,6 +47,7 @@
        private static Logger logger_ = 
Logger.getLogger(ReadResponseResolver.class);
     private final String table;
     private final int responseCount;
+    private final Map<Message, ReadResponse> results = new 
NonBlockingHashMap<Message, ReadResponse>();
 
     public ReadResponseResolver(String table, int responseCount)
     {
@@ -84,11 +83,11 @@
          * query exists then we need to compare the digest with 
          * the digest of the data that is received.
         */
-               for (Message response : responses)
-               {                                                   
-            byte[] body = response.getMessageBody();
-            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-            ReadResponse result = ReadResponse.serializer().deserialize(new 
DataInputStream(bufIn));
+               for (Message message : responses)
+               {
+            ReadResponse result = results.get(message);
+            if (result == null)
+                continue; // arrived after quorum already achieved
             if (result.isDigestQuery())
             {
                 digest = result.digest();
@@ -97,14 +96,11 @@
             else
             {
                 versions.add(result.row().cf);
-                endPoints.add(response.getFrom());
+                endPoints.add(message.getFrom());
                 key = result.row().key;
             }
         }
 
-        if (logger_.isDebugEnabled())
-            logger_.debug("responses deserialized");
-
                // If there was a digest query compare it with all the data 
digests
                // If there is a mismatch then throw an exception so that read 
repair can happen.
         if (isDigestQuery)
@@ -190,30 +186,36 @@
         return resolved;
     }
 
-       public boolean isDataPresent(Collection<Message> responses)
+    public void preprocess(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        try
+        {
+            ReadResponse result = ReadResponse.serializer().deserialize(new 
DataInputStream(bufIn));
+            results.put(message, result);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    public boolean isDataPresent(Collection<Message> responses)
        {
         if (responses.size() < responseCount)
             return false;
 
-        boolean isDataPresent = false;
-        for (Message response : responses)
+        for (Message message : responses)
         {
-            byte[] body = response.getMessageBody();
-            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-            try
-            {
-                ReadResponse result = 
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                if (!result.isDigestQuery())
-                {
-                    isDataPresent = true;
-                }
-                bufIn.close();
-            }
-            catch (IOException ex)
-            {
-                throw new RuntimeException(ex);
-            }
+            ReadResponse result = results.get(message);
+            if (result == null)
+                continue; // arrived after quorum already achieved
+            if (!result.isDigestQuery())
+                return true;
         }
-        return isDataPresent;
+
+        return false;
     }
+
 }
Index: src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
===================================================================
--- src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java       
(revision 1022989)
+++ src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java       
(working copy)
@@ -108,6 +108,10 @@
         return resolvedRows;
     }
 
+    public void preprocess(Message message)
+    {
+    }
+
     public boolean isDataPresent(Collection<Message> responses)
     {
         return responses.size() >= sources.size();
Index: src/java/org/apache/cassandra/service/IResponseResolver.java
===================================================================
--- src/java/org/apache/cassandra/service/IResponseResolver.java        
(revision 1022989)
+++ src/java/org/apache/cassandra/service/IResponseResolver.java        
(working copy)
@@ -37,4 +37,5 @@
        public T resolve(Collection<Message> responses) throws 
DigestMismatchException, IOException;
        public boolean isDataPresent(Collection<Message> responses);
 
+    public void preprocess(Message message);
 }

Reply via email to