Rajiv, Those are good points. As for implementation we have developed a class in producer that can be probably re-used for the consumer as well.
org.apache.kafka.clients.producer.internals.BufferPool Please feel free to add more comments on KAFKA-2045. Guozhang On Tue, Mar 24, 2015 at 12:21 PM, Rajiv Kurian <ra...@signalfuse.com> wrote: > Hi Guozhang, > > Yeah the main motivation is to not require de-serialization but still allow > the consumer to de-serialize into objects if they really want to. Another > motivation for iterating over the ByteBuffer on the fly is that we can > prevent copies all together. This has an added implication though. Since we > re-use the same response buffer continuously and iterate over it, this > means that the iteration has to be done in a single threaded manner > specifically on the thread that did the poll. The work flow will look a bit > like this: > > i) Re-use the same request buffer to create a request and write to the > socket. > ii) On poll re-use the same response buffer to read in the request till it > is complete. > iii) When the response is complete respond with an iterator to the response > ByteBuffer. The consumer must now consume the entire ByteBuffer on this > thread since we use the a single mutable iterator to go through the > ByteBuffer. > > It is tricker when we consider that during iteration the consumer might > send more kafka requests and call poll further. We can maintain a pointer > to the end of the response ByteBuffer to allow more responses to stream in > and essentially use the response buffer as a circular buffer. So basically > there would be two pointers - iteration_pointer and > network_append_pointer. And the iteration_pointer <= > network_append_pointer. If the network_append_pointer reaches the end of > the buffer, then we cannot accept any more data without the > iteration_pointer having progressed further. Since responses have to be > continuous, we cannot have a response straddle the circular buffer's end. > In this case we'd have to detect this case and copy the response to the > beginning of the buffer so that it is continuous. Though this is a bit more > complicated, it ensures that we don't have to move in lockstep and can > pipeline requests/responses. I have written something like this for another > application and since this is all happening in a single thread it is a bit > easier and I think possible. > > Like Jay suggested if application level deserialization is a bottleneck > that needs to be solved by passing slices of these ByteBuffers out to a > pool of threads, then this approach WILL NOT work since we expect the > ByteBuffer to be linearly iterated in one go. If we want slices to be > passed to a pool of threads then probably copying to new ByteBuffers is the > only good option. For my application that is definitely not the case since > deserialization is free and the cache friendliness of iterating over a hot > buffer trumps every other factor. But for applications with more involved > serialization we can re-use the above framework. The more low level > iterator (discussed in paragraph 2 and 3) can be wrapped in a higher level > iterator that just copies the bytes for each message to a new ByteBuffer > and hands them over. The low level iterator is still responsible for buffer > management and the higher level iterator is just a plain old consumer that > consumes the bytes by copying them over to a new ByteBuffer and handing > them to the application. The application is now free to transfer these to > other threads for processing/deserialization. > > Thanks for creating KAFKA-1326. Let me know what your thoughts are on this > proposed design? > > Thanks, > Rajiv > > > On Tue, Mar 24, 2015 at 9:22 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Rajiv, > > > > Just want to clarify, that the main motivation for iterating over the > byte > > buffer directly instead of iterating over the records is for not > enforcing > > de-serialization, right? I think that can be done by passing the > > deserializer class info into the consumer record instead of putting the > > de-serialized objects into the consumer, and only do de-serialization > > on-the-fly when key/value fields are requested while exposing another API > > to return the underlying raw bytes from the record. What do you think? > > > > As for de-compression, today we allocate new buffer for each > de-compressed > > message, and it may be required to do de-compression with re-useable > buffer > > with memory control. I will create a ticket under KAFKA-1326 for that. > > > > Guozhang > > > > Guozhang > > > > > > > > On Sun, Mar 22, 2015 at 1:22 PM, Rajiv Kurian <ra...@signalfuse.com> > > wrote: > > > > > Hi Guozhang, > > > > > > Thanks for the note. So if we do not deserialize till the last moment > > like > > > Jay suggested we would not need extra buffers for deserialization. > Unless > > > we need random access to messages it seems like we can deserialize > right > > at > > > the time of iteration and allocate objects only if the Consumer > actually > > > requires deserialized objects. Some applications like mine can just > > utilize > > > the ByteBuffer contents and in those cases we can take in a ByteBuffer > > or a > > > ConsumerRecord which points to a ByteBuffer and make it point to the > > right > > > slice of our response buffer. This ByteBuffer can be re-used over and > > over > > > again. > > > > > > As for decompression - that will require an extra buffer allocated at > > > startup, but not new allocations per response. I imagine the steps > would > > be > > > something like this: > > > > > > i) Receive entire response which includes possibly compressed messages > on > > > ResponseBuffer. ResponseBuffer is only allocated once. > > > ii) Once we have received an entire response, return an iterator to the > > > underlying ByteBuffer. > > > iii) On each message set check if it is compressed or not. > > > iv) If compressed decompress only that message set in a streaming > manner > > > and stores the result in another preallocated buffer > > > DecompressedMessagesBuffer. When the consumer asks for the next message > > > alter the flyweight that it supplies you to point to the correct slice > in > > > the DecompressedMessagesBuffer. Do this till all decompressed messages > > from > > > that message set are consumed. > > > v) If the message set was not compressed, then alter the flyweight to > > point > > > to the correct slice in the ResponseBuffer. Do this till all messages > > from > > > this message set are consumed. > > > > > > So basically we will switch between the two buffers when messages are > > > compressed or otherwise stay only on the ResponseBuffer. With the lazy > > > approach another nice thing is we can do CRC validation right before > the > > > message gets consumed too. Which means that the CRC algorithm will scan > > > some bytes which will be hot in cache (since we are iterating a linear > > > array of bytes) and as soon as we have checked the bytes they will be > > > consumed by the application which means they will stay hot in L1 cache. > > > > > > All of this "theory crafting" is based on my currently incomplete > > > understanding of the kafka protocol but it seems like compression is > per > > > message set so we can stream though. Also since in general we can > iterate > > > through the response buffer, we can do CRC validation right before the > > > message is consumed. > > > > > > Thanks, > > > Rajiv > > > > > > On Sun, Mar 22, 2015 at 10:29 AM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > Rajiv, > > > > > > > > A side note for re-using ByteBuffer: in the new consumer we do plan > to > > > add > > > > some memory management module such that it will try to reuse > allocated > > > > buffer for fetch responses. But as Jay mentioned, for now inside the > > > poll() > > > > call de-serialization and de-compression is done which requires to > > > allocate > > > > another buffer to write the de-serialized and de-compressed bytes, > > hence > > > > even with fetch response buffer management today we still need to > > > allocate > > > > new buffers if compressed messages are delivered. > > > > > > > > Guozhang > > > > > > > > On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <jay.kr...@gmail.com> > > wrote: > > > > > > > > > Zijing, the new consumer will be in the next release. We don't > have a > > > > hard > > > > > date for this yet. > > > > > > > > > > Rajiv, I'm game if we can show a >= 20% performance improvement. It > > > > > certainly could be an improvement, but it might also be that the > CRC > > > > > validation and compression dominate. > > > > > > > > > > The first step would be > > > > > https://issues.apache.org/jira/browse/KAFKA-1895 > > > > > > > > > > This would delay the deserialization of ConsumerRecords and make it > > > > > basically a wrapper around the actual MemoryRecords chunks which > are > > > > > basically ByteBuffer instances. We could then add an optional > > > > > ConsumerRecords param in poll, to allow you to hand back your > > > > > ConsumerRecords instance. Optimizing the ConsumerRecord instance > > reuse > > > > > would also be possible. > > > > > > > > > > That JIRA is actually useful irrespective of these optimizations > > though > > > > > because deferring the deserialization and decompression would allow > > you > > > > to > > > > > punt that work into a pool of processor threads. Since it is more > > > common > > > > to > > > > > see the real bottleneck be application serialization this could be > > > > > valuable. > > > > > > > > > > WRT the object reuse I wouldn't be shocked to learn that you > actually > > > get > > > > > equivalent stuff out of the jvm allocator's own pooling and/or > escape > > > > > analysis once we are doing the allocation on demand. So it would be > > > good > > > > to > > > > > show a real performance improvement on the newer JVMs before > deciding > > > to > > > > go > > > > > this route. > > > > > > > > > > -Jay > > > > > > > > > > > > > > > On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian < > ra...@signalfuse.com> > > > > > wrote: > > > > > > > > > > > Just a follow up - I have implemented a pretty hacky prototype > It's > > > too > > > > > > unclean to share right now but I can clean it up if you are > > > > interested. I > > > > > > don't think it offers anything that people already don't know > about > > > > > though. > > > > > > > > > > > > My prototype doesn't do any metadata requests yet but I have a > > > > flyweight > > > > > > builder/parser of the FetchRequest and the FetchResponse protocol > > > that > > > > I > > > > > > based on the protocol page at > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > > > > > > . > > > > > > On my consumer thread I allocate two buffers per broker a small > > > request > > > > > > buffer and a bigger response buffer. My assumption is that the > > > response > > > > > > buffer will be big enough that it is never too small for the > > > response. > > > > > If > > > > > > this assumption doesn't hold then we would have to reallocate a > > > bigger > > > > > > buffer. These are the steps I am following right now: > > > > > > > > > > > > i) Whenever a client makes a request I fill up the relevant > request > > > > > > buffer(s) using my flyweight object instead of allocating an > object > > > and > > > > > > then serializing it. I then write this buffer to the > SocketChannel > > > > > > connected to the right Kafka broker. I don't yet handle the case > > > where > > > > > all > > > > > > of the request bytes could not be written synchronously. If I > were > > > to > > > > > > handle that properly I would need to register WRITE interest on > the > > > > > > SocketChannel on incomplete writes. > > > > > > > > > > > > ii) Whenever the client calls poll() I register read interest on > > the > > > > > socket > > > > > > channel using the selector and make a select call on the > underlying > > > > > > selector. If the selector says that the socket channel is > > readable, I > > > > > read > > > > > > all the bytes possible into my response ByteBuffer. If I can read > > the > > > > > first > > > > > > 4 bytes I know how big the response is. So I wait till all > response > > > > bytes > > > > > > have been read before attempting to parse it. This might take > > > multiple > > > > > poll > > > > > > calls. Once I have received the expected number of bytes I > iterate > > > > > through > > > > > > the response ByteBuffer. There is no random access access > provided > > to > > > > the > > > > > > ByteBuffer since there are way too many variable length fields. > > This > > > a > > > > > > DISADVANTAGE of doing flyweight style parsing instead of > > > > deserialization > > > > > > into POJOs with indexed data structures like maps. I could build > > > > > indexing > > > > > > with a non-allocating primitive hashmap like the koloboke one > > mapping > > > > > > Topic/Partition/Messages to offsets in the ByteBuffer if really > > > > required. > > > > > > For me the lack of random access has not been a problem at all. > > > > Iteration > > > > > > is done just by providing a ByteBuffer and offset (within > > ByteBuffer) > > > > > pair > > > > > > for each message. In my own application I wrap this ByteBuffer > and > > > > offset > > > > > > pair in my own flyweight which knows how to decode the data. > > > > > > > > > > > > iii) Once an entire response has been iterated through I can > re-use > > > > both > > > > > > the request as well as response buffers. > > > > > > > > > > > > I am sure this can be improved upon a lot. I allocate the > > following > > > > > before > > > > > > starting my client: > > > > > > i) A Direct ByteBuffer for requests. > > > > > > ii) A Direct ByteBuffer for responses. Sizes of the > ByteBuffers > > > are > > > > > > chosen so that they can fit the biggest request/responses we > > expect. > > > > > > ii) A flyweight that wraps the Request ByteBuffer and can be > > used > > > to > > > > > > write a particular kind of request. So far I have only written a > > > > > flyweight > > > > > > for a FetchRequest. > > > > > > iii) A flyweight that wraps the Response ByteBuffer and can be > > used > > > > to > > > > > > iterate through the entire response including finding errors. > There > > > is > > > > no > > > > > > random access allowed right now. So far I have only written a > > > flyweight > > > > > > parser for a FetchResponse. > > > > > > iv) A flyweight for every type of application level message > that > > I > > > > > > expect. Iterating through the response ByteBuffer using the > > flyweight > > > > in > > > > > > (iii) I get offsets into the ByteBuffer for each individual > > message. > > > My > > > > > own > > > > > > messages work by using absolute position getLong(position), > > > > > > getShort(position) etc calls on a ByteBuffer, so this works out > > great > > > > for > > > > > > me. We could alternatively provide an API that does allocates a > new > > > > > > ByteBuffer and copies the data for people who don't want the zero > > > > > > allocation access. > > > > > > > > > > > > Sadly in my profiling I noticed that selector implementation in > the > > > > > > JDK allocates > > > > > > but it seems like projects like Netty, Aeron have worked around > > this > > > by > > > > > > using reflection to replace the underlying implementation to make > > it > > > > > > non-allocating. Other than that I have absolutely zero > allocations > > > past > > > > > the > > > > > > initial 4-5 allocations. I also have absolutely zero copies in > user > > > > space > > > > > > once the data lands from the socket onto the ByteBuffer. > > > > > > > > > > > > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian < > > ra...@signalfuse.com > > > > > > > > > > wrote: > > > > > > > > > > > > > I had a few more thoughts on the new API. Currently we use > kafka > > to > > > > > > > transfer really compact messages - around 25-35 bytes each. Our > > use > > > > > case > > > > > > is > > > > > > > a lot of messages but each very small. Will it be possible to > do > > > the > > > > > > > following > > > > > > > to reuse a ConsumerRecord and the ConsumerRecords objects? We > > > employ > > > > > our > > > > > > > own binary encoding to encode our messages and it is encoded > as a > > > > > > flyweight > > > > > > > in SBE/Cap'n'Proto style and can be consumed without any > > > > > > > decoding/deserialization step. Currently we use the > > SimpleConsumer > > > > and > > > > > > have > > > > > > > found the fact that it hands out ByteBuffers very useful > instead > > > of a > > > > > > > mandatory deserialization into a POJO step. Even then through > > > memory > > > > > > > profiling we have found out that the ByteBuffers and the > records > > > take > > > > > > more > > > > > > > space than the actual messages themselves. Ideally we can > > allocate > > > a > > > > > big > > > > > > > ByteBuffer (maybe kafka already does it) to receive our data > and > > > then > > > > > > just > > > > > > > get some kind of a flyweight iterator on the ByteBuffer > something > > > > like > > > > > > this: > > > > > > > > > > > > > > // Allocate a single ByteBuffer or have kafka allocate this > > > > internally. > > > > > > > Either way it would be very useful to just keep re-using this > > > buffer. > > > > > > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); > > > > > > > // Create a coupe of flyweights. > > > > > > > ConsumerRecordIterator iterator = new ConsumerRecordIterator(); > > > > > > > ConsumerRecord record = new ConsumerRecord(); > > > > > > > // Subscribe to the topics we care about. > > > > > > > consumer.subscribe(TopicPartitions_I_am_interested_in); > > > > > > > while (running) { > > > > > > > // Use the buffer to get data from the server. Additionally > > > re-use > > > > > the > > > > > > > same iterator. > > > > > > > // Resets the iterator to point to the start of the > ByteBuffer. > > > > > > > consumer.poll(buffer, iterator, timeout); > > > > > > > // Now the iterator has pointers to the ByteBuffer and is > > capable > > > > of > > > > > > > advancing the cursor to read every message. > > > > > > > while (iterator.hasNext()) { > > > > > > > // The consumer record is just a flyweight over a > ByteBuffer > > > and > > > > is > > > > > > > adjusted to point to the start of the next record. > > > > > > > iterator.getNextInto(record); > > > > > > > // This is not a new ByteBuffer, its just the big buffer > with > > > its > > > > > > > position and limit adjusted so that we only read the current > > > record. > > > > > > > // Alternatively we could give it our own ByteBuffer, but > the > > > > point > > > > > > > would be to not do a copy and instead adjust the supplied > > > > > > > // ByteBuffer's position and limit and pointer (through > > > > reflection) > > > > > > to > > > > > > > point to the right slice of the actual big ByteBuffer. > > > > > > > // This means that we cannot stash this buffer in a hash > map > > or > > > > any > > > > > > > heap allocated structure since it's contents keep changing as > we > > > > > iterate. > > > > > > > ByteBuffer buffer = record.getUnderlying(); > > > > > > > process(buffer); // Process cannot keep a reference to the > > > > buffer > > > > > - > > > > > > > this is really the programmer's responsibility. > > > > > > > } > > > > > > > } > > > > > > > > > > > > > > Given how the new consumer is meant to be used from a single > > thread > > > > - I > > > > > > > think these optional non-allocating methods will be a great > boon > > > for > > > > > any > > > > > > > one trying to save memory or prevent heap churn. It has > exactly 0 > > > > > copies > > > > > > in > > > > > > > user space too which is great for performance. In our case > since > > > the > > > > > > > messages are very tiny we end up spending more memory in all > the > > > > > wrapper > > > > > > > objects than in the actual messages so this would be a game > > changer > > > > for > > > > > > us. > > > > > > > I'd love to be able to contribute if this seems sensible. > > Hopefully > > > > > these > > > > > > > non-allocating methods can co-exist with the allocating ones > and > > > only > > > > > > users > > > > > > > who absolutely need to use them can make the trade-off of > better > > > > > > > efficiency/performance for a slightly more error-prone and ugly > > > API. > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > Thanks, > > > > > > > Rajiv > > > > > > > > > > > > > > > > > > > > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo > > > > <alter...@yahoo.com.invalid > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >> Hi all,The document is very beautiful and the Kafka release > > > version > > > > > for > > > > > > >> this will be? and what is the timeline? > > > > > > >> ThanksEdwin > > > > > > >> > > > > > > >> > > > > > > >> On Friday, March 20, 2015 4:20 PM, Rajiv Kurian < > > > > > > >> ra...@signalfuse.com> wrote: > > > > > > >> > > > > > > >> > > > > > > >> Awesome - can't wait for this version to be out! > > > > > > >> > > > > > > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps < > > jay.kr...@gmail.com> > > > > > > wrote: > > > > > > >> > > > > > > >> > The timeout in the poll call is more or less the timeout > used > > by > > > > the > > > > > > >> > selector. So each call to poll will do socket activity on > any > > > > ready > > > > > > >> > sockets, waiting for up to that time for a socket to be > ready. > > > > There > > > > > > is > > > > > > >> no > > > > > > >> > longer any background threads involved in the consumer, all > > > > activity > > > > > > is > > > > > > >> > driven by the application thread(s). > > > > > > >> > > > > > > > >> > The max fetch request wait time is controlled with a config > > and > > > is > > > > > > >> > independent of the time given to poll. > > > > > > >> > > > > > > > >> > -Jay > > > > > > >> > > > > > > > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian < > > > > > ra...@signalfuse.com> > > > > > > >> > wrote: > > > > > > >> > > > > > > > >> > > I am trying to understand the semantics of the timeout > > > specified > > > > > in > > > > > > >> the > > > > > > >> > > poll method in > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html > > > > > > >> > > . > > > > > > >> > > Is this timeout a measure of how long the fetch request > will > > > be > > > > > > >> parked on > > > > > > >> > > the broker waiting for a reply or is this more like the > > > timeout > > > > in > > > > > > >> > > selector.select(long timeout) i.e. the method will return > > with > > > > > > >> whatever > > > > > > >> > > data is there after waiting a maximum of timeout. Exposing > > the > > > > > > >> selector > > > > > > >> > > timeout will be very helpful for us because we want to > put a > > > > tight > > > > > > >> bound > > > > > > >> > on > > > > > > >> > > how long we are ready to wait on the poll call. When this > > API > > > is > > > > > > >> > available > > > > > > >> > > we plan to use a single thread to get data from kafka, > > process > > > > > them > > > > > > as > > > > > > >> > well > > > > > > >> > > as run periodic jobs. For the periodic jobs to run we > need a > > > > > > >> guarantee on > > > > > > >> > > how much time the poll call can take at most. > > > > > > >> > > > > > > > > >> > > Thanks! > > > > > > >> > > > > > > > > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian < > > > > > ra...@signalfuse.com > > > > > > > > > > > > > >> > > wrote: > > > > > > >> > > > > > > > > >> > > > Thanks! > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > On Thursday, March 19, 2015, Jay Kreps < > > jay.kr...@gmail.com > > > > > > > > > > wrote: > > > > > > >> > > > > > > > > > >> > > >> Err, here: > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html > > > > > > >> > > >> > > > > > > >> > > >> -Jay > > > > > > >> > > >> > > > > > > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps < > > > > > jay.kr...@gmail.com> > > > > > > >> > wrote: > > > > > > >> > > >> > > > > > > >> > > >> > The current work in progress is documented here: > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian < > > > > > > >> ra...@signalfuse.com > > > > > > >> > > > > > > > > >> > > >> > wrote: > > > > > > >> > > >> > > > > > > > >> > > >> >> Is there a link to the proposed new consumer > > > non-blocking > > > > > API? > > > > > > >> > > >> >> > > > > > > >> > > >> >> Thanks, > > > > > > >> > > >> >> Rajiv > > > > > > >> > > >> >> > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang