[ https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14383418#comment-14383418 ]
Rajiv Kurian edited comment on KAFKA-2045 at 3/27/15 6:51 AM: -------------------------------------------------------------- Copying from the email list and expanding here. My proposal is a single RequestBuffer and a single ResponseBuffer per broker per Consumer. We also need another ByteBuffer to write decompressed message sets (only one message set at a time) to. Another part of the proposal is that when we get a complete response we iterate through the ResponseBuffer and hand out pointers into the buffer to the main low level iterator. The work flow will look a bit like this: i) Re-use the same request buffer to create a request and write to the socket. ii) On poll re-use the same response buffer to read in the request till it is complete. iii) When the response is complete respond with an iterator to the response ByteBuffer. The consumer must now consume the entire ByteBuffer on this thread since we use the a single mutable iterator to go through the ByteBuffer. It is tricker when we consider that during iteration the consumer might send more kafka requests and call poll further. I have a proposal to handle this and still allow requests/responses to be pipelined. I have written something like this for another application and since this is all happening in a single thread it is a bit easier. Here is my proposed design: The response buffer looks a bit like this: ________________________________________ |_______________:___________|_____________} : is the consumer iterator i.e. the position of the next message to be consumed. This is always at the start of a new response, new message set, new message in a message set, end of a response etc. Because iterating on the fly means we will go from one token to the next one. | is the network producer iterator i.e. the position of the next byte from the broker. This can be any arbitrary byte boundary really. } is the end of the buffer. Some details: i) Most of the times the consumer iterator ( : ) remains behind the network iterator(|). It will catch up when we have consumed all messages. ii) Sometimes we will have fewer bytes than required for a complete response at the end of the buffer. In such a case we will have to wait till we have enough space in the front of the buffer i.e. consumer iterator has moved on enough to create enough space. In such a case we will write some special value at the index where we skipped to the end. This will let the consumer know that it needs to skip ahead to the front of the buffer. This means that every response HAS to be prepended by a special header (can be a single byte) which says if the following bytes are a valid message or not. Say 1 means valid, 0 means invalid. The consumer will only know that there is more to read when the network-producer sequence has gone ahead of the consumer sequence. And it will either read the message right there (if the header says 1) or skip to the beginning of the buffer (if the header says 0). iii) Every time the network producer prepares to write a new response to an index in the buffer it needs to ensure that there is at least 4 bytes (size of message field) + 1 byte for the header + some other minimum amount we can use as a heuristic before it considers the buffer slice usable. If the buffer slice is not usable it has to write the skip ahead header (0) and increment its sequence to point exactly to the end of the buffer. Once the network producer finds enough space in the thread it should wait till at least 4 bytes are read so that it can definitively know the request size. When it reads the size it is certain how many contiguous bytes are required (size of message + 1 byte for header) . Now it can decide with certainty whether it can continue with the slice of the buffer it has (i.e from current pos till end of buffer) or if it has to write the skip ahead header (0) and wait till it gets more contiguous space. If it can continue then it will wait till the entire response is read into the buffer (i,e bytes read == size of response). When this happens, it needs to increment its sequence by size of response + 1 (1 for the header ) and also set the header to 1 to indicate that there is a readable response. iv) A ConsumerRecordIterator is only reset/created once we have an entire contiguous response. Each ConsumerRecordIterator will have a pointer to the beginning of the response and its size. The iterator will hand out ConsumerRecord messages (or reuse them). Each ConsumerRecord also has a pointer to the beginning of the message it is pointing to and a size/pointer to the end. It can also have a mutable reference field for the Topic and an int for the partition. All fields are mutable so that these flyweights can be re-used. v) Once an entire response has been iterated through ( i.e bytes iterated == size of message) we increment the consumer iterator by the size of the message + 1 (for the header). This cycle continues and all we need is a single buffer and a few longs to manage all the sequences. I might not have been very clear with the explanation but the tldr version is that it is a circular buffer with a twist that it handles variable sized messages. Since that means that based on the sequence of messages we may not have enough contiguous space in the buffer to represent an entire message, we need to do some trickery to communicate to the consumer about this scenario and to instruct it to skip past the temporarily wasted space. This trickery is implemented in form of a header per response. I will be happy to answer questions about the design, and provide some diagrams showing some typical runs. Like Jay suggested if application level deserialization is a bottleneck that needs to be solved by passing slices of these ByteBuffers out to a pool of threads, then this approach WILL NOT work since we expect the ByteBuffer to be linearly iterated in one go. If we want slices to be passed to a pool of threads then probably copying each individual message to new ByteBuffers is the only good option. For my application that is definitely not the case since deserialization is free and the cache friendliness of iterating over a hot buffer trumps every other factor. To solve for this use case, the more low level iterator (discussed in previous paragraphs) can be wrapped in a higher level iterator that just copies the bytes for each message to a new ByteBuffer and hands them over. The low level iterator is still responsible for buffer management and the higher level iterator is just a wrapper over it and consumes a message by copying it's bytes to a new ByteBuffer and hands that ByteBuffer to the application. The application is now free to transfer these to other threads for processing/deserialization. There is still a caveat with the higher level iterator - it still MUST be consumed in its entirety on the thread that called poll(). The ByteBuffers from the high level iterator could be sent to other threads. This doesn't seem like much of a limitation especially if the API is very clear that the iterator itself is not thread safe. I understand that this is a big big change, but based on your feedback I'd be happy to work on a prototype once I have the license to do so from my bosses. Given our priorities I think this will happen soon. Would love to get more feedback here. was (Author: rzidane): Copying from the email list and expanding here. My proposal is a single RequestBuffer and a single ResponseBuffer per broker per Consumer. We also need another ByteBuffer to write decompressed message sets (only one message set at a time) to. Another part of the proposal is that when we get a complete response we iterate through the ResponseBuffer and hand out pointers into the buffer to the main low level iterator. The work flow will look a bit like this: i) Re-use the same request buffer to create a request and write to the socket. ii) On poll re-use the same response buffer to read in the request till it is complete. iii) When the response is complete respond with an iterator to the response ByteBuffer. The consumer must now consume the entire ByteBuffer on this thread since we use the a single mutable iterator to go through the ByteBuffer. It is tricker when we consider that during iteration the consumer might send more kafka requests and call poll further. I have a proposal to handle this and still allow requests/responses to be pipelined. I have written something like this for another application and since this is all happening in a single thread it is a bit easier. Here is my proposed design: The response buffer looks a bit like this: ________________________________________ |_______________:___________|_____________} : is the consumer iterator i.e. the position of the next message to be consumed. This is always at the start of a new response, new message set, new message in a message set, end of a response etc. Because iterating on the fly means we will go from one token to the next one. | is the network producer iterator i.e. the position of the next byte from the broker. This can be any arbitrary byte boundary really. } is the end of the buffer. Some details: i) Most of the times the consumer iterator (:) remains behind the network iterator(|). It will catch up when we have consumed all messages. ii) Sometimes we will have fewer bytes than required for a complete response at the end of the buffer. In such a case we will have to wait till we have enough space in the front of the buffer i.e. consumer iterator has moved on enough to create enough space. In such a case we will write some special value at the index where we skipped to the end. This will let the consumer know that it needs to skip ahead to the front of the buffer. This means that every response HAS to be prepended by a special header (can be a single byte) which says if the following bytes are a valid message or not. Say 1 means valid, 0 means invalid. The consumer will only know that there is more to read when the network-producer sequence has gone ahead of the consumer sequence. And it will either read the message right there (if the header says 1) or skip to the beginning of the buffer (if the header says 0). iii) Every time the network producer prepares to write a new response to an index in the buffer it needs to ensure that there is at least 4 bytes (size of message field) + 1 byte for the header + some other minimum amount we can use as a heuristic before it considers the buffer slice usable. If the buffer slice is not usable it has to write the skip ahead header (0) and increment its sequence to point exactly to the end of the buffer. Once the network producer finds enough space in the thread it should wait till at least 4 bytes are read so that it can definitively know the request size. When it reads the size it is certain how many contiguous bytes are required (size of message + 1 byte for header) . Now it can decide with certainty whether it can continue with the slice of the buffer it has (i.e from current pos till end of buffer) or if it has to write the skip ahead header (0) and wait till it gets more contiguous space. If it can continue then it will wait till the entire response is read into the buffer (i,e bytes read == size of response). When this happens, it needs to increment its sequence by size of response + 1 (1 for the header ) and also set the header to 1 to indicate that there is a readable response. iv) A ConsumerRecordIterator is only reset/created once we have an entire contiguous response. Each ConsumerRecordIterator will have a pointer to the beginning of the response and its size. The iterator will hand out ConsumerRecord messages (or reuse them). Each ConsumerRecord also has a pointer to the beginning of the message it is pointing to and a size/pointer to the end. It can also have a mutable reference field for the Topic and an int for the partition. All fields are mutable so that these flyweights can be re-used. v) Once an entire response has been iterated through ( i.e bytes iterated == size of message) we increment the consumer iterator by the size of the message + 1 (for the header). This cycle continues and all we need is a single buffer and a few longs to manage all the sequences. I might not have been very clear with the explanation but the tldr version is that it is a circular buffer with a twist that it handles variable sized messages. Since that means that based on the sequence of messages we may not have enough contiguous space in the buffer to represent an entire message, we need to do some trickery to communicate to the consumer about this scenario and to instruct it to skip past the temporarily wasted space. This trickery is implemented in form of a header per response. I will be happy to answer questions about the design, and provide some diagrams showing some typical runs. Like Jay suggested if application level deserialization is a bottleneck that needs to be solved by passing slices of these ByteBuffers out to a pool of threads, then this approach WILL NOT work since we expect the ByteBuffer to be linearly iterated in one go. If we want slices to be passed to a pool of threads then probably copying each individual message to new ByteBuffers is the only good option. For my application that is definitely not the case since deserialization is free and the cache friendliness of iterating over a hot buffer trumps every other factor. To solve for this use case, the more low level iterator (discussed in previous paragraphs) can be wrapped in a higher level iterator that just copies the bytes for each message to a new ByteBuffer and hands them over. The low level iterator is still responsible for buffer management and the higher level iterator is just a wrapper over it and consumes a message by copying it's bytes to a new ByteBuffer and hands that ByteBuffer to the application. The application is now free to transfer these to other threads for processing/deserialization. There is still a caveat with the higher level iterator - it still MUST be consumed in its entirety on the thread that called poll(). The ByteBuffers from the high level iterator could be sent to other threads. This doesn't seem like much of a limitation especially if the API is very clear that the iterator itself is not thread safe. I understand that this is a big big change, but based on your feedback I'd be happy to work on a prototype once I have the license to do so from my bosses. Given our priorities I think this will happen soon. Would love to get more feedback here. > Memory Management on the consumer > --------------------------------- > > Key: KAFKA-2045 > URL: https://issues.apache.org/jira/browse/KAFKA-2045 > Project: Kafka > Issue Type: Sub-task > Reporter: Guozhang Wang > > We need to add the memory management on the new consumer like we did in the > new producer. This would probably include: > 1. byte buffer re-usage for fetch response partition data. > 2. byte buffer re-usage for on-the-fly de-compression. -- This message was sent by Atlassian JIRA (v6.3.4#6332)