v3 attached to fix assertion failure. Those last seconds after "sending to client" aren't fixable except perhaps by using a faster client language. That is, the client is probably doing read-and-deserialize as one step rather than 2, so being slow in the deserialize part means the server is sitting there waiting to send the rest of a large response. But while that's happening it's using negligible resources on the server side so it's not the end of the world for a multithreaded app.
On Mon, Oct 18, 2010 at 2:37 PM, Wayne <wav...@gmail.com> wrote: > It is much faster again, but the last 3 seconds still linger. There a lot > more logout complete messages getting logged all of the time, but the one > below matches in time to the time the client registers it takes to get the > data back. It also produces errors sometimes...the error example is listed > below. > > DEBUG [pool-1-thread-64] 2010-10-18 19:25:28,866 CassandraServer.java (line > 219) get_slice > DEBUG [pool-1-thread-64] 2010-10-18 19:25:28,867 StorageProxy.java (line > 471) strongread reading data for SliceFromReadCommand(table='table', > key='key1', column_parent='QueryPath(columnFamilyName='fact', > superColumnName='null', columnName='null')', start='503a', finish='503a7c', > reversed=false, count=10000000) from 698@/x.x.x.6 > DEBUG [pool-1-thread-64] 2010-10-18 19:25:28,867 StorageProxy.java (line > 471) strongread reading digest for SliceFromReadCommand(table='table', > key='key1', column_parent='QueryPath(columnFamilyName='fact', > superColumnName='null', columnName='null')', start='503a', finish='503a7c', > reversed=false, count=10000000) from 699@/x.x.x.7 > DEBUG [pool-1-thread-64] 2010-10-18 19:25:28,867 StorageProxy.java (line > 471) strongread reading digest for SliceFromReadCommand(table='table', > key='key1', column_parent='QueryPath(columnFamilyName='fact', > superColumnName='null', columnName='null')', start='503a', finish='503a7c', > reversed=false, count=10000000) from 699@/x.x.x.8 > DEBUG [pool-1-thread-63] 2010-10-18 19:25:29,410 CassandraServer.java (line > 667) logout complete > DEBUG [RESPONSE-STAGE:5] 2010-10-18 19:25:30,864 ResponseVerbHandler.java > (line 42) Processing response on a callback from > 0952ED39-07CE-7971-8F06-0D611FCB5F34@/x.x.x.6 > DEBUG [RESPONSE-STAGE:6] 2010-10-18 19:25:31,449 ResponseVerbHandler.java > (line 42) Processing response on a callback from > 0952ED39-07CE-7971-8F06-0D611FCB5F34@/x.x.x.8 > DEBUG [pool-1-thread-64] 2010-10-18 19:25:31,449 ReadResponseResolver.java > (line 71) resolving 2 responses > DEBUG [pool-1-thread-64] 2010-10-18 19:25:31,608 ReadResponseResolver.java > (line 116) digests verified > DEBUG [pool-1-thread-64] 2010-10-18 19:25:31,609 ReadResponseResolver.java > (line 133) resolve: 160 ms. > DEBUG [pool-1-thread-64] 2010-10-18 19:25:31,609 StorageProxy.java (line > 494) quorumResponseHandler: 2742 ms. > DEBUG [pool-1-thread-64] 2010-10-18 19:25:31,667 CassandraServer.java (line > 191) Slice converted to thrift; sending to client > DEBUG [pool-1-thread-63] 2010-10-18 19:25:32,044 CassandraServer.java (line > 667) logout complete > DEBUG [pool-1-thread-63] 2010-10-18 19:25:32,238 CassandraServer.java (line > 667) logout complete > DEBUG [pool-1-thread-63] 2010-10-18 19:25:32,377 CassandraServer.java (line > 667) logout complete > DEBUG [pool-1-thread-63] 2010-10-18 19:25:33,890 CassandraServer.java (line > 667) logout complete > DEBUG [pool-1-thread-63] 2010-10-18 19:25:34,401 CassandraServer.java (line > 667) logout complete > DEBUG [pool-1-thread-63] 2010-10-18 19:25:34,538 CassandraServer.java (line > 667) logout complete > DEBUG [pool-1-thread-64] 2010-10-18 19:25:34,925 CassandraServer.java (line > 667) logout complete > > > > Error: > > DEBUG [RESPONSE-STAGE:4] 2010-10-18 19:22:48,355 ResponseVerbHandler.java > (line 42) Processing response on a callback from > DDF1D615-BF1E-2CD1-34C6-96FA47D63259@/x.x.x.8 > ERROR [RESPONSE-STAGE:4] 2010-10-18 19:22:48,356 CassandraDaemon.java (line > 87) Uncaught exception in thread Thread[RESPONSE-STAGE:4,5,main] > java.lang.AssertionError > at > org.apache.cassandra.service.ReadResponseResolver.getResult(ReadResponseResolver.java:218) > at > org.apache.cassandra.service.ReadResponseResolver.isDataPresent(ReadResponseResolver.java:209) > at > org.apache.cassandra.service.QuorumResponseHandler.response(QuorumResponseHandler.java:93) > at > org.apache.cassandra.net.ResponseVerbHandler.doVerb(ResponseVerbHandler.java:44) > at > org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:49) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:619) > DEBUG [pool-1-thread-33] 2010-10-18 19:22:48,380 ReadResponseResolver.java > (line 71) resolving 2 responses > DEBUG [pool-1-thread-33] 2010-10-18 19:22:48,542 ReadResponseResolver.java > (line 116) digests verified > DEBUG [pool-1-thread-33] 2010-10-18 19:22:48,542 ReadResponseResolver.java > (line 133) resolve: 162 ms. > DEBUG [pool-1-thread-33] 2010-10-18 19:22:48,542 StorageProxy.java (line > 494) quorumResponseHandler: 2688 ms. > > > On Sat, Oct 16, 2010 at 9:18 PM, Jonathan Ellis <jbel...@gmail.com> wrote: >> >> Thanks. Take 2 attached. >> >> On Sat, Oct 16, 2010 at 3:37 PM, Wayne <wav...@gmail.com> wrote: >> > ERROR [pool-1-thread-64] 2010-10-16 20:27:55,396 Cassandra.java (line >> > 1280) >> > Internal error processing get_slice >> > java.lang.AssertionError >> > at >> > >> > org.apache.cassandra.service.ReadResponseResolver.resolve(ReadResponseResolver.java:88) >> > at >> > >> > org.apache.cassandra.service.ReadResponseResolver.resolve(ReadResponseResolver.java:44) >> > at >> > >> > org.apache.cassandra.service.QuorumResponseHandler.get(QuorumResponseHandler.java:86) >> > at >> > >> > org.apache.cassandra.service.StorageProxy.strongRead(StorageProxy.java:489) >> > at >> > >> > org.apache.cassandra.service.StorageProxy.readProtocol(StorageProxy.java:353) >> > at >> > >> > org.apache.cassandra.thrift.CassandraServer.readColumnFamily(CassandraServer.java:97) >> > at >> > >> > org.apache.cassandra.thrift.CassandraServer.getSlice(CassandraServer.java:180) >> > at >> > >> > org.apache.cassandra.thrift.CassandraServer.multigetSliceInternal(CassandraServer.java:262) >> > at >> > >> > org.apache.cassandra.thrift.CassandraServer.get_slice(CassandraServer.java:223) >> > at >> > >> > org.apache.cassandra.thrift.Cassandra$Processor$get_slice.process(Cassandra.java:1272) >> > at >> > >> > org.apache.cassandra.thrift.Cassandra$Processor.process(Cassandra.java:1166) >> > at >> > >> > org.apache.cassandra.thrift.CustomTThreadPoolServer$WorkerProcess.run(CustomTThreadPoolServer.java:167) >> > at >> > >> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >> > at >> > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >> > at java.lang.Thread.run(Thread.java:619) >> > >> > >> > On Sat, Oct 16, 2010 at 9:52 AM, Jonathan Ellis <jbel...@gmail.com> >> > wrote: >> >> >> >> Stack trace from cassandra log? >> >> >> >> On Sat, Oct 16, 2010 at 6:50 AM, Wayne <wav...@gmail.com> wrote: >> >> > While doing a read I get a TApplicationException: Internal Error >> >> > processing >> >> > get_slice. >> >> > >> >> > On Fri, Oct 15, 2010 at 5:49 PM, Jonathan Ellis <jbel...@gmail.com> >> >> > wrote: >> >> >> >> >> >> On Fri, Oct 15, 2010 at 2:21 PM, Wayne <wav...@gmail.com> wrote: >> >> >> > The optimization definitely shaved off some time. Now it is >> >> >> > running >> >> >> > about 3x >> >> >> > CFSTATS reported time. Below are the logs. >> >> >> > >> >> >> > There is a ~300ms time frame after the last ResponseVerbHandler >> >> >> > prior >> >> >> > to >> >> >> > the >> >> >> > resolver starting. Based on a quorum read the response resolver >> >> >> > should >> >> >> > kick >> >> >> > after 2 reads come in correct? That would mean it waited 500ms >> >> >> > before >> >> >> > starting. Where is this time going? >> >> >> >> >> >> It's going to deserialize the replies to see if it has both the data >> >> >> and enough digests to call resolve(). Then resolve deserializes >> >> >> them >> >> >> a 2nd time, so there's an easy win there caching the first >> >> >> deserialize, in the patch attached. (Applies on top of the previous >> >> >> one, which has been committed to the 0.6 svn branch at >> >> >> https://svn.apache.org/repos/asf/cassandra/branches/cassandra-0.6.) >> >> >> >> >> >> > There is still the 3+ second delay between the last 2 entries. Is >> >> >> > this >> >> >> > the >> >> >> > thrift server? >> >> >> >> >> >> It's converting the reply from Cassandra's internal representation >> >> >> to >> >> >> thrift, and sending it to the client. I suspect most of the time is >> >> >> the actual sending/waiting for the client to read part. Patch 2 >> >> >> also >> >> >> includes a debug statement after the convert-to-thrift stage to >> >> >> verify. >> >> >> >> >> >> -- >> >> >> Jonathan Ellis >> >> >> Project Chair, Apache Cassandra >> >> >> co-founder of Riptano, the source for professional Cassandra support >> >> >> http://riptano.com >> >> > >> >> > >> >> >> >> >> >> >> >> -- >> >> Jonathan Ellis >> >> Project Chair, Apache Cassandra >> >> co-founder of Riptano, the source for professional Cassandra support >> >> http://riptano.com >> > >> > >> >> >> >> -- >> Jonathan Ellis >> Project Chair, Apache Cassandra >> co-founder of Riptano, the source for professional Cassandra support >> http://riptano.com > > -- Jonathan Ellis Project Chair, Apache Cassandra co-founder of Riptano, the source for professional Cassandra support http://riptano.com
Index: src/java/org/apache/cassandra/thrift/CassandraServer.java =================================================================== --- src/java/org/apache/cassandra/thrift/CassandraServer.java (revision 1022989) +++ src/java/org/apache/cassandra/thrift/CassandraServer.java (working copy) @@ -187,6 +187,9 @@ columnFamiliesMap.put(command.key, thriftifiedColumns); } + if (logger.isDebugEnabled()) + logger.debug("Slice converted to thrift; sending to client"); + return columnFamiliesMap; } Index: src/java/org/apache/cassandra/service/QuorumResponseHandler.java =================================================================== --- src/java/org/apache/cassandra/service/QuorumResponseHandler.java (revision 1022989) +++ src/java/org/apache/cassandra/service/QuorumResponseHandler.java (working copy) @@ -89,6 +89,7 @@ public void response(Message message) { responses.add(message); + responseResolver.preprocess(message); if (responseResolver.isDataPresent(responses)) { condition.signal(); Index: src/java/org/apache/cassandra/service/ReadResponseResolver.java =================================================================== --- src/java/org/apache/cassandra/service/ReadResponseResolver.java (revision 1023089) +++ src/java/org/apache/cassandra/service/ReadResponseResolver.java (working copy) @@ -22,10 +22,7 @@ import java.io.DataInputStream; import java.io.IOError; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; +import java.util.*; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ReadResponse; @@ -37,6 +34,7 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.config.DatabaseDescriptor; +import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.apache.log4j.Logger; @@ -49,6 +47,7 @@ private static Logger logger_ = Logger.getLogger(ReadResponseResolver.class); private final String table; private final int responseCount; + private final Map<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>(); public ReadResponseResolver(String table, int responseCount) { @@ -84,11 +83,11 @@ * query exists then we need to compare the digest with * the digest of the data that is received. */ - for (Message response : responses) - { - byte[] body = response.getMessageBody(); - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); - ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); + for (Message message : responses) + { + ReadResponse result = results.get(message); + if (result == null) + continue; // arrived after quorum already achieved if (result.isDigestQuery()) { digest = result.digest(); @@ -97,14 +96,11 @@ else { versions.add(result.row().cf); - endPoints.add(response.getFrom()); + endPoints.add(message.getFrom()); key = result.row().key; } } - if (logger_.isDebugEnabled()) - logger_.debug("responses deserialized"); - // If there was a digest query compare it with all the data digests // If there is a mismatch then throw an exception so that read repair can happen. if (isDigestQuery) @@ -190,30 +186,36 @@ return resolved; } - public boolean isDataPresent(Collection<Message> responses) + public void preprocess(Message message) + { + byte[] body = message.getMessageBody(); + ByteArrayInputStream bufIn = new ByteArrayInputStream(body); + try + { + ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); + results.put(message, result); + } + catch (IOException e) + { + throw new IOError(e); + } + } + + public boolean isDataPresent(Collection<Message> responses) { if (responses.size() < responseCount) return false; - boolean isDataPresent = false; - for (Message response : responses) + for (Message message : responses) { - byte[] body = response.getMessageBody(); - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); - try - { - ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); - if (!result.isDigestQuery()) - { - isDataPresent = true; - } - bufIn.close(); - } - catch (IOException ex) - { - throw new RuntimeException(ex); - } + ReadResponse result = results.get(message); + if (result == null) + continue; // arrived after quorum already achieved + if (!result.isDigestQuery()) + return true; } - return isDataPresent; + + return false; } + } Index: src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java =================================================================== --- src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (revision 1022989) +++ src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (working copy) @@ -108,6 +108,10 @@ return resolvedRows; } + public void preprocess(Message message) + { + } + public boolean isDataPresent(Collection<Message> responses) { return responses.size() >= sources.size(); Index: src/java/org/apache/cassandra/service/IResponseResolver.java =================================================================== --- src/java/org/apache/cassandra/service/IResponseResolver.java (revision 1022989) +++ src/java/org/apache/cassandra/service/IResponseResolver.java (working copy) @@ -37,4 +37,5 @@ public T resolve(Collection<Message> responses) throws DigestMismatchException, IOException; public boolean isDataPresent(Collection<Message> responses); + public void preprocess(Message message); }