[ https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14385500#comment-14385500 ]
Rajiv Kurian edited comment on KAFKA-2045 at 3/28/15 7:40 PM: -------------------------------------------------------------- [~jkreps] Totally agree with you on the concerns with a re-write. I am sure I'll end up re-using most of the code, otherwise it will take too long in any case. But given this is just a prototype, I want the freedom to be able to make changes without being bound by the existing architecture and class hierarchy of the client. Even if I do re-implement some of the parts I'll make sure that the client can (a) Do metadata requests so it can react to leaders moving etc. (b) Actually read from multiple topic/partitions spread across multiple brokers and not just a single broker. Again since this is just a rewrite with the sole purpose of exploring possible performance improvements there can be mainly two consequences: i) It shows no improvements: In that case we end up not spending too much time changing the current code, and the hacky code just gets us to this conclusion faster. ii) It shows interesting improvements: If this were true, we can afford to spend some time seeing which things actually improved performance and make a call on how to integrate best. It might be counterproductive to look at the current client implementation and look at the % of time spent in each of the bottlenecks because those numbers are a consequence of the current memory layout. For example if we do an on the fly CRC check and decompression - CRC check time might go up a bit because now we are not striding over a contiguous ByteBuffer in one sweep. Right now the current client has this pattern --- CRC check on Message1 --> CRC check on Message2 .... --> CRC check on MessageN --> Hand message 1 to consumer .... --> Hand message N to consumer. Instead with the current proposal we will have a pattern of ----- Do CRC on a Message1 --> Hand Message1 to consumer --> Do CRC on a Message2 --> Hand Message2 to the consumer . So the CRC checks are separated by potential (certain?) cache floundering during the handling of the message by the consumer. On the other hand from the perspective of the consumer, the pattern looks like this -- Do CRC and validation on all messages starting with 1 to N --> Hand messages 1 to N to client. Now by the time the Kafka consumer is done with validating and deserializing message N, message 1 is possibly already out of the cache. With the new approach since we hand over a message right after validating it, we give the consumer a hot in cache message, which might improve the consumer processing enough to offset for the loss in CRC striding efficiency. Or it may not. It might just turn out that doing the CRC validation upfront is just a pure win since all the CRC tables will be in cache etc and striding access for the CRC math is worth an extra iteration of the ByteBuffer contents. But it is might still be more profitable to elide copies and prevent creation of objects by doing on the fly decoding and handing out indexes into the actual response ByteBuffer. This result might further be affected by how expensive the deserialization and processing of the message is. If the message is a bloated JSON encoded object that is de-serialized into a POJO and then processed really slowly then none of this will probably matter. On the other hand if the message is a compact and binary encoded and can be processed with minimal cache misses, this stuff might add up. My point is that basing the TODOs on the current profile may not be optimal because the profile is a massive consequence of the current layout and allocation patterns. Also the profile will give %s and we might be able to keep the same %s but just still reduce the overall time taken for the entire consumer processing cycle. Just to belabor the point even further, the current hash map implementations might suffer so many cache misses that they mask an underlying improvement opportunity for the data in the maps. Switching to compact primitive arrays based open hash maps might surface that opportunity again. Is there a performance test that is used to keep track of the new Consumer's performance? If so maybe I can wrap that in a JMH suite and re-use that to test improvements? was (Author: rzidane): [~jkreps] Totally agree with you on the concerns with a re-write. I am sure I'll end up re-using most of the code, otherwise it will take too long in any case. But given this is just a prototype, I want the freedom to be able to make changes without being bound by the existing architecture and class hierarchy of the client. Even if I do re-implement some of the parts I'll make sure that the client can (a) Do metadata requests so it can react to leaders moving etc. (b) Actually read from multiple topic/partitions spread across multiple brokers and not just a single broker. Again since this is just a rewrite with the sole purpose of exploring possible performance improvements there can be mainly two consequences: i) It shows no improvements: In that case we end up not spending too much time changing the current code, and the hacky code just gets us to this conclusion faster. ii) It shows interesting improvements: If this were true, we can afford to spend some time seeing which things actually improved performance and make a call on how to integrate best. It might be counterproductive to look at the current client implementation and look at the % of time spent in each of the bottlenecks because those numbers are a consequence of the current memory layout. For example if we do an on the fly CRC check and decompression - CRC check time might go up a bit because now we are not striding over a contiguous ByteBuffer in one sweep. Right now the current client has this pattern --- CRC check on Message1 --> CRC check on Message2 .... --> CRC check on MessageN --> Hand message 1 to consumer .... --> Hand message N to consumer. Instead with the current proposal we will have a pattern of ----- Do CRC on a Message1 --> Hand Message1 to consumer --> Do CRC on a Message2 --> Hand Message2 to the consumer . So the CRC checks are separated by potential (certain?) cache floundering during the handling of the message by the client. On the other hand from the perspective of the consumer, the pattern looks like this -- Do CRC and validation on all messages starting with 1 to N --> Hand messages 1 to N to client. Now by the time the Kafka consumer is done with validating and deserializing message N, message 1 is possibly already out of the cache. With the new approach since we hand over a message right after validating it, we give the consumer a hot in cache message, which might improve the consumer processing enough to offset for the loss in CRC striding efficiency. Or it may not. It might just turn out that doing the CRC validation upfront is just a pure win since all the CRC tables will be in cache etc and striding access for the CRC math is worth an extra iteration of the ByteBuffer contents. But it is might still be more profitable to elide copies and prevent creation of objects by doing on the fly decoding and handing out indexes into the actual response ByteBuffer. This result might further be affected by how expensive the deserialization and processing of the message is. If the message is a bloated JSON encoded object that is de-serialized into a POJO and then processed really slowly then none of this will probably matter. On the other hand if the message is a compact and binary encoded and can be processed with minimal cache misses, this stuff might add up. My point is that basing the TODOs on the current profile may not be optimal because the profile is a massive consequence of the current layout and allocation patterns. Also the profile will give %s and we might be able to keep the same %s but just still reduce the overall time taken for the entire consumer processing cycle. Just to belabor the point even further, the current hash map implementations might suffer so many cache misses that they mask an underlying improvement opportunity for the data in the maps. Switching to compact primitive arrays based open hash maps might surface that opportunity again. Is there a performance test that is used to keep track of the new Consumer's performance? If so maybe I can wrap that in a JMH suite and re-use that to test improvements? > Memory Management on the consumer > --------------------------------- > > Key: KAFKA-2045 > URL: https://issues.apache.org/jira/browse/KAFKA-2045 > Project: Kafka > Issue Type: Sub-task > Reporter: Guozhang Wang > > We need to add the memory management on the new consumer like we did in the > new producer. This would probably include: > 1. byte buffer re-usage for fetch response partition data. > 2. byte buffer re-usage for on-the-fly de-compression. -- This message was sent by Atlassian JIRA (v6.3.4#6332)