Hi Rajiv, Just want to clarify, that the main motivation for iterating over the byte buffer directly instead of iterating over the records is for not enforcing de-serialization, right? I think that can be done by passing the deserializer class info into the consumer record instead of putting the de-serialized objects into the consumer, and only do de-serialization on-the-fly when key/value fields are requested while exposing another API to return the underlying raw bytes from the record. What do you think?
As for de-compression, today we allocate new buffer for each de-compressed message, and it may be required to do de-compression with re-useable buffer with memory control. I will create a ticket under KAFKA-1326 for that. Guozhang Guozhang On Sun, Mar 22, 2015 at 1:22 PM, Rajiv Kurian <ra...@signalfuse.com> wrote: > Hi Guozhang, > > Thanks for the note. So if we do not deserialize till the last moment like > Jay suggested we would not need extra buffers for deserialization. Unless > we need random access to messages it seems like we can deserialize right at > the time of iteration and allocate objects only if the Consumer actually > requires deserialized objects. Some applications like mine can just utilize > the ByteBuffer contents and in those cases we can take in a ByteBuffer or a > ConsumerRecord which points to a ByteBuffer and make it point to the right > slice of our response buffer. This ByteBuffer can be re-used over and over > again. > > As for decompression - that will require an extra buffer allocated at > startup, but not new allocations per response. I imagine the steps would be > something like this: > > i) Receive entire response which includes possibly compressed messages on > ResponseBuffer. ResponseBuffer is only allocated once. > ii) Once we have received an entire response, return an iterator to the > underlying ByteBuffer. > iii) On each message set check if it is compressed or not. > iv) If compressed decompress only that message set in a streaming manner > and stores the result in another preallocated buffer > DecompressedMessagesBuffer. When the consumer asks for the next message > alter the flyweight that it supplies you to point to the correct slice in > the DecompressedMessagesBuffer. Do this till all decompressed messages from > that message set are consumed. > v) If the message set was not compressed, then alter the flyweight to point > to the correct slice in the ResponseBuffer. Do this till all messages from > this message set are consumed. > > So basically we will switch between the two buffers when messages are > compressed or otherwise stay only on the ResponseBuffer. With the lazy > approach another nice thing is we can do CRC validation right before the > message gets consumed too. Which means that the CRC algorithm will scan > some bytes which will be hot in cache (since we are iterating a linear > array of bytes) and as soon as we have checked the bytes they will be > consumed by the application which means they will stay hot in L1 cache. > > All of this "theory crafting" is based on my currently incomplete > understanding of the kafka protocol but it seems like compression is per > message set so we can stream though. Also since in general we can iterate > through the response buffer, we can do CRC validation right before the > message is consumed. > > Thanks, > Rajiv > > On Sun, Mar 22, 2015 at 10:29 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > Rajiv, > > > > A side note for re-using ByteBuffer: in the new consumer we do plan to > add > > some memory management module such that it will try to reuse allocated > > buffer for fetch responses. But as Jay mentioned, for now inside the > poll() > > call de-serialization and de-compression is done which requires to > allocate > > another buffer to write the de-serialized and de-compressed bytes, hence > > even with fetch response buffer management today we still need to > allocate > > new buffers if compressed messages are delivered. > > > > Guozhang > > > > On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > Zijing, the new consumer will be in the next release. We don't have a > > hard > > > date for this yet. > > > > > > Rajiv, I'm game if we can show a >= 20% performance improvement. It > > > certainly could be an improvement, but it might also be that the CRC > > > validation and compression dominate. > > > > > > The first step would be > > > https://issues.apache.org/jira/browse/KAFKA-1895 > > > > > > This would delay the deserialization of ConsumerRecords and make it > > > basically a wrapper around the actual MemoryRecords chunks which are > > > basically ByteBuffer instances. We could then add an optional > > > ConsumerRecords param in poll, to allow you to hand back your > > > ConsumerRecords instance. Optimizing the ConsumerRecord instance reuse > > > would also be possible. > > > > > > That JIRA is actually useful irrespective of these optimizations though > > > because deferring the deserialization and decompression would allow you > > to > > > punt that work into a pool of processor threads. Since it is more > common > > to > > > see the real bottleneck be application serialization this could be > > > valuable. > > > > > > WRT the object reuse I wouldn't be shocked to learn that you actually > get > > > equivalent stuff out of the jvm allocator's own pooling and/or escape > > > analysis once we are doing the allocation on demand. So it would be > good > > to > > > show a real performance improvement on the newer JVMs before deciding > to > > go > > > this route. > > > > > > -Jay > > > > > > > > > On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <ra...@signalfuse.com> > > > wrote: > > > > > > > Just a follow up - I have implemented a pretty hacky prototype It's > too > > > > unclean to share right now but I can clean it up if you are > > interested. I > > > > don't think it offers anything that people already don't know about > > > though. > > > > > > > > My prototype doesn't do any metadata requests yet but I have a > > flyweight > > > > builder/parser of the FetchRequest and the FetchResponse protocol > that > > I > > > > based on the protocol page at > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > > > > . > > > > On my consumer thread I allocate two buffers per broker a small > request > > > > buffer and a bigger response buffer. My assumption is that the > response > > > > buffer will be big enough that it is never too small for the > response. > > > If > > > > this assumption doesn't hold then we would have to reallocate a > bigger > > > > buffer. These are the steps I am following right now: > > > > > > > > i) Whenever a client makes a request I fill up the relevant request > > > > buffer(s) using my flyweight object instead of allocating an object > and > > > > then serializing it. I then write this buffer to the SocketChannel > > > > connected to the right Kafka broker. I don't yet handle the case > where > > > all > > > > of the request bytes could not be written synchronously. If I were > to > > > > handle that properly I would need to register WRITE interest on the > > > > SocketChannel on incomplete writes. > > > > > > > > ii) Whenever the client calls poll() I register read interest on the > > > socket > > > > channel using the selector and make a select call on the underlying > > > > selector. If the selector says that the socket channel is readable, I > > > read > > > > all the bytes possible into my response ByteBuffer. If I can read the > > > first > > > > 4 bytes I know how big the response is. So I wait till all response > > bytes > > > > have been read before attempting to parse it. This might take > multiple > > > poll > > > > calls. Once I have received the expected number of bytes I iterate > > > through > > > > the response ByteBuffer. There is no random access access provided to > > the > > > > ByteBuffer since there are way too many variable length fields. This > a > > > > DISADVANTAGE of doing flyweight style parsing instead of > > deserialization > > > > into POJOs with indexed data structures like maps. I could build > > > indexing > > > > with a non-allocating primitive hashmap like the koloboke one mapping > > > > Topic/Partition/Messages to offsets in the ByteBuffer if really > > required. > > > > For me the lack of random access has not been a problem at all. > > Iteration > > > > is done just by providing a ByteBuffer and offset (within ByteBuffer) > > > pair > > > > for each message. In my own application I wrap this ByteBuffer and > > offset > > > > pair in my own flyweight which knows how to decode the data. > > > > > > > > iii) Once an entire response has been iterated through I can re-use > > both > > > > the request as well as response buffers. > > > > > > > > I am sure this can be improved upon a lot. I allocate the following > > > before > > > > starting my client: > > > > i) A Direct ByteBuffer for requests. > > > > ii) A Direct ByteBuffer for responses. Sizes of the ByteBuffers > are > > > > chosen so that they can fit the biggest request/responses we expect. > > > > ii) A flyweight that wraps the Request ByteBuffer and can be used > to > > > > write a particular kind of request. So far I have only written a > > > flyweight > > > > for a FetchRequest. > > > > iii) A flyweight that wraps the Response ByteBuffer and can be used > > to > > > > iterate through the entire response including finding errors. There > is > > no > > > > random access allowed right now. So far I have only written a > flyweight > > > > parser for a FetchResponse. > > > > iv) A flyweight for every type of application level message that I > > > > expect. Iterating through the response ByteBuffer using the flyweight > > in > > > > (iii) I get offsets into the ByteBuffer for each individual message. > My > > > own > > > > messages work by using absolute position getLong(position), > > > > getShort(position) etc calls on a ByteBuffer, so this works out great > > for > > > > me. We could alternatively provide an API that does allocates a new > > > > ByteBuffer and copies the data for people who don't want the zero > > > > allocation access. > > > > > > > > Sadly in my profiling I noticed that selector implementation in the > > > > JDK allocates > > > > but it seems like projects like Netty, Aeron have worked around this > by > > > > using reflection to replace the underlying implementation to make it > > > > non-allocating. Other than that I have absolutely zero allocations > past > > > the > > > > initial 4-5 allocations. I also have absolutely zero copies in user > > space > > > > once the data lands from the socket onto the ByteBuffer. > > > > > > > > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <ra...@signalfuse.com > > > > > > wrote: > > > > > > > > > I had a few more thoughts on the new API. Currently we use kafka to > > > > > transfer really compact messages - around 25-35 bytes each. Our use > > > case > > > > is > > > > > a lot of messages but each very small. Will it be possible to do > the > > > > > following > > > > > to reuse a ConsumerRecord and the ConsumerRecords objects? We > employ > > > our > > > > > own binary encoding to encode our messages and it is encoded as a > > > > flyweight > > > > > in SBE/Cap'n'Proto style and can be consumed without any > > > > > decoding/deserialization step. Currently we use the SimpleConsumer > > and > > > > have > > > > > found the fact that it hands out ByteBuffers very useful instead > of a > > > > > mandatory deserialization into a POJO step. Even then through > memory > > > > > profiling we have found out that the ByteBuffers and the records > take > > > > more > > > > > space than the actual messages themselves. Ideally we can allocate > a > > > big > > > > > ByteBuffer (maybe kafka already does it) to receive our data and > then > > > > just > > > > > get some kind of a flyweight iterator on the ByteBuffer something > > like > > > > this: > > > > > > > > > > // Allocate a single ByteBuffer or have kafka allocate this > > internally. > > > > > Either way it would be very useful to just keep re-using this > buffer. > > > > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); > > > > > // Create a coupe of flyweights. > > > > > ConsumerRecordIterator iterator = new ConsumerRecordIterator(); > > > > > ConsumerRecord record = new ConsumerRecord(); > > > > > // Subscribe to the topics we care about. > > > > > consumer.subscribe(TopicPartitions_I_am_interested_in); > > > > > while (running) { > > > > > // Use the buffer to get data from the server. Additionally > re-use > > > the > > > > > same iterator. > > > > > // Resets the iterator to point to the start of the ByteBuffer. > > > > > consumer.poll(buffer, iterator, timeout); > > > > > // Now the iterator has pointers to the ByteBuffer and is capable > > of > > > > > advancing the cursor to read every message. > > > > > while (iterator.hasNext()) { > > > > > // The consumer record is just a flyweight over a ByteBuffer > and > > is > > > > > adjusted to point to the start of the next record. > > > > > iterator.getNextInto(record); > > > > > // This is not a new ByteBuffer, its just the big buffer with > its > > > > > position and limit adjusted so that we only read the current > record. > > > > > // Alternatively we could give it our own ByteBuffer, but the > > point > > > > > would be to not do a copy and instead adjust the supplied > > > > > // ByteBuffer's position and limit and pointer (through > > reflection) > > > > to > > > > > point to the right slice of the actual big ByteBuffer. > > > > > // This means that we cannot stash this buffer in a hash map or > > any > > > > > heap allocated structure since it's contents keep changing as we > > > iterate. > > > > > ByteBuffer buffer = record.getUnderlying(); > > > > > process(buffer); // Process cannot keep a reference to the > > buffer > > > - > > > > > this is really the programmer's responsibility. > > > > > } > > > > > } > > > > > > > > > > Given how the new consumer is meant to be used from a single thread > > - I > > > > > think these optional non-allocating methods will be a great boon > for > > > any > > > > > one trying to save memory or prevent heap churn. It has exactly 0 > > > copies > > > > in > > > > > user space too which is great for performance. In our case since > the > > > > > messages are very tiny we end up spending more memory in all the > > > wrapper > > > > > objects than in the actual messages so this would be a game changer > > for > > > > us. > > > > > I'd love to be able to contribute if this seems sensible. Hopefully > > > these > > > > > non-allocating methods can co-exist with the allocating ones and > only > > > > users > > > > > who absolutely need to use them can make the trade-off of better > > > > > efficiency/performance for a slightly more error-prone and ugly > API. > > > > > > > > > > Thoughts? > > > > > > > > > > Thanks, > > > > > Rajiv > > > > > > > > > > > > > > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo > > <alter...@yahoo.com.invalid > > > > > > > > > wrote: > > > > > > > > > >> Hi all,The document is very beautiful and the Kafka release > version > > > for > > > > >> this will be? and what is the timeline? > > > > >> ThanksEdwin > > > > >> > > > > >> > > > > >> On Friday, March 20, 2015 4:20 PM, Rajiv Kurian < > > > > >> ra...@signalfuse.com> wrote: > > > > >> > > > > >> > > > > >> Awesome - can't wait for this version to be out! > > > > >> > > > > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <jay.kr...@gmail.com> > > > > wrote: > > > > >> > > > > >> > The timeout in the poll call is more or less the timeout used by > > the > > > > >> > selector. So each call to poll will do socket activity on any > > ready > > > > >> > sockets, waiting for up to that time for a socket to be ready. > > There > > > > is > > > > >> no > > > > >> > longer any background threads involved in the consumer, all > > activity > > > > is > > > > >> > driven by the application thread(s). > > > > >> > > > > > >> > The max fetch request wait time is controlled with a config and > is > > > > >> > independent of the time given to poll. > > > > >> > > > > > >> > -Jay > > > > >> > > > > > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian < > > > ra...@signalfuse.com> > > > > >> > wrote: > > > > >> > > > > > >> > > I am trying to understand the semantics of the timeout > specified > > > in > > > > >> the > > > > >> > > poll method in > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html > > > > >> > > . > > > > >> > > Is this timeout a measure of how long the fetch request will > be > > > > >> parked on > > > > >> > > the broker waiting for a reply or is this more like the > timeout > > in > > > > >> > > selector.select(long timeout) i.e. the method will return with > > > > >> whatever > > > > >> > > data is there after waiting a maximum of timeout. Exposing the > > > > >> selector > > > > >> > > timeout will be very helpful for us because we want to put a > > tight > > > > >> bound > > > > >> > on > > > > >> > > how long we are ready to wait on the poll call. When this API > is > > > > >> > available > > > > >> > > we plan to use a single thread to get data from kafka, process > > > them > > > > as > > > > >> > well > > > > >> > > as run periodic jobs. For the periodic jobs to run we need a > > > > >> guarantee on > > > > >> > > how much time the poll call can take at most. > > > > >> > > > > > > >> > > Thanks! > > > > >> > > > > > > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian < > > > ra...@signalfuse.com > > > > > > > > > >> > > wrote: > > > > >> > > > > > > >> > > > Thanks! > > > > >> > > > > > > > >> > > > > > > > >> > > > On Thursday, March 19, 2015, Jay Kreps <jay.kr...@gmail.com > > > > > > wrote: > > > > >> > > > > > > > >> > > >> Err, here: > > > > >> > > >> > > > > >> > > >> > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html > > > > >> > > >> > > > > >> > > >> -Jay > > > > >> > > >> > > > > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps < > > > jay.kr...@gmail.com> > > > > >> > wrote: > > > > >> > > >> > > > > >> > > >> > The current work in progress is documented here: > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian < > > > > >> ra...@signalfuse.com > > > > >> > > > > > > >> > > >> > wrote: > > > > >> > > >> > > > > > >> > > >> >> Is there a link to the proposed new consumer > non-blocking > > > API? > > > > >> > > >> >> > > > > >> > > >> >> Thanks, > > > > >> > > >> >> Rajiv > > > > >> > > >> >> > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang