Hi Guozhang,

Thanks for the note. So if we do not deserialize till the last moment like
Jay suggested we would not need extra buffers for deserialization. Unless
we need random access to messages it seems like we can deserialize right at
the time of iteration and allocate objects only if the Consumer actually
requires deserialized objects. Some applications like mine can just utilize
the ByteBuffer contents and in those cases we can take in a ByteBuffer or a
ConsumerRecord which points to a ByteBuffer and make it point to the right
slice of our response buffer. This ByteBuffer can be re-used over and over
again.

As for decompression - that will require an extra buffer allocated at
startup, but not new allocations per response. I imagine the steps would be
something like this:

i) Receive entire response which includes possibly compressed messages on
ResponseBuffer. ResponseBuffer is only allocated once.
ii) Once we have received an entire response, return an iterator to the
underlying ByteBuffer.
iii) On each message set check if it is compressed or not.
iv) If compressed decompress only that message set in a streaming manner
and stores the result in another preallocated buffer
DecompressedMessagesBuffer. When the consumer asks for the next message
alter the flyweight that it supplies you to point to the correct slice in
the DecompressedMessagesBuffer. Do this till all decompressed messages from
that message set are consumed.
v) If the message set was not compressed, then alter the flyweight to point
to the correct slice in the ResponseBuffer. Do this till all messages from
this message set are consumed.

So basically we will switch between the two buffers when messages are
compressed or otherwise stay only on the ResponseBuffer. With the lazy
approach another nice thing is we can do CRC validation right before the
message gets consumed too. Which means that the CRC algorithm will scan
some bytes which will be hot in cache (since we are iterating a linear
array of bytes) and as soon as we have checked the bytes they will be
consumed by the application which means they will stay hot in L1 cache.

All of this "theory crafting" is based on my currently incomplete
understanding of the kafka protocol but it seems like compression is per
message set so we can stream though. Also since in general we can iterate
through the response buffer, we can do CRC validation right before the
message is consumed.

Thanks,
Rajiv

On Sun, Mar 22, 2015 at 10:29 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Rajiv,
>
> A side note for re-using ByteBuffer: in the new consumer we do plan to add
> some memory management module such that it will try to reuse allocated
> buffer for fetch responses. But as Jay mentioned, for now inside the poll()
> call de-serialization and de-compression is done which requires to allocate
> another buffer to write the de-serialized and de-compressed bytes, hence
> even with fetch response buffer management today we still need to allocate
> new buffers if compressed messages are delivered.
>
> Guozhang
>
> On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > Zijing, the new consumer will be in the next release. We don't have a
> hard
> > date for this yet.
> >
> > Rajiv, I'm game if we can show a >= 20% performance improvement. It
> > certainly could be an improvement, but it might also be that the CRC
> > validation and compression dominate.
> >
> > The first step would be
> >   https://issues.apache.org/jira/browse/KAFKA-1895
> >
> > This would delay the deserialization of ConsumerRecords and make it
> > basically a wrapper around the actual MemoryRecords chunks which are
> > basically ByteBuffer instances. We could then add an optional
> > ConsumerRecords param in poll, to allow you to hand back your
> > ConsumerRecords instance. Optimizing the ConsumerRecord instance reuse
> > would also be possible.
> >
> > That JIRA is actually useful irrespective of these optimizations though
> > because deferring the deserialization and decompression would allow you
> to
> > punt that work into a pool of processor threads. Since it is more common
> to
> > see the real bottleneck be application serialization this could be
> > valuable.
> >
> > WRT the object reuse I wouldn't be shocked to learn that you actually get
> > equivalent stuff out of the jvm allocator's own pooling and/or escape
> > analysis once we are doing the allocation on demand. So it would be good
> to
> > show a real performance improvement on the newer JVMs before deciding to
> go
> > this route.
> >
> > -Jay
> >
> >
> > On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > Just a follow up - I have implemented a pretty hacky prototype It's too
> > > unclean to share right now but I can clean it up if you are
> interested. I
> > > don't think it offers anything that people already don't know about
> > though.
> > >
> > > My prototype doesn't do any metadata requests yet but I have a
> flyweight
> > > builder/parser of the FetchRequest and the FetchResponse protocol that
> I
> > > based on the protocol page at
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > .
> > > On my consumer thread I allocate two buffers per broker a small request
> > > buffer and a bigger response buffer. My assumption is that the response
> > > buffer will be big enough that it is never too small for the response.
> > If
> > > this assumption doesn't hold then we would have to reallocate a bigger
> > > buffer. These are the steps I am following right now:
> > >
> > > i) Whenever a client makes a request I fill up the relevant request
> > > buffer(s) using my flyweight object instead of allocating an object and
> > > then serializing it. I then write this buffer to the SocketChannel
> > > connected to the right Kafka broker. I don't yet handle the case where
> > all
> > > of the request bytes could not be  written synchronously. If I were to
> > > handle that properly I would need to register WRITE interest on the
> > > SocketChannel on incomplete writes.
> > >
> > > ii) Whenever the client calls poll() I register read interest on the
> > socket
> > > channel using the selector and make a select call on the underlying
> > > selector. If the selector says that the socket channel is readable, I
> > read
> > > all the bytes possible into my response ByteBuffer. If I can read the
> > first
> > > 4 bytes I know how big the response is. So I wait till all response
> bytes
> > > have been read before attempting to parse it. This might take multiple
> > poll
> > > calls. Once I have received the expected number of bytes I iterate
> > through
> > > the response ByteBuffer. There is no random access access provided to
> the
> > > ByteBuffer since there are way too many variable length fields. This a
> > > DISADVANTAGE of doing flyweight style parsing instead of
> deserialization
> > > into POJOs with indexed data structures like maps.  I could build
> > indexing
> > > with a non-allocating primitive hashmap like the koloboke one mapping
> > > Topic/Partition/Messages to offsets in the ByteBuffer if really
> required.
> > > For me the lack of random access has not been a problem at all.
> Iteration
> > > is done just by providing a ByteBuffer and offset (within ByteBuffer)
> > pair
> > > for each message. In my own application I wrap this ByteBuffer and
> offset
> > > pair in my own flyweight which knows how to decode the data.
> > >
> > > iii) Once an entire response has been iterated through I can re-use
> both
> > > the request as well as response buffers.
> > >
> > > I am sure this can be improved upon a lot.  I allocate the following
> > before
> > > starting my client:
> > >     i) A Direct ByteBuffer for requests.
> > >    ii) A Direct ByteBuffer for responses. Sizes of the ByteBuffers are
> > > chosen so that they can fit the biggest request/responses we expect.
> > >    ii) A flyweight that wraps the Request ByteBuffer and can be used to
> > > write a particular kind of request.  So far I have only written a
> > flyweight
> > > for a FetchRequest.
> > >   iii) A flyweight that wraps the Response ByteBuffer and can be used
> to
> > > iterate through the entire response including finding errors. There is
> no
> > > random access allowed right now. So far I have only written a flyweight
> > > parser for a FetchResponse.
> > >   iv) A flyweight for every type of application level message that I
> > > expect. Iterating through the response ByteBuffer using the flyweight
> in
> > > (iii) I get offsets into the ByteBuffer for each individual message. My
> > own
> > > messages work by using absolute position getLong(position),
> > > getShort(position) etc calls on a ByteBuffer, so this works out great
> for
> > > me. We could alternatively provide an API that does allocates a new
> > > ByteBuffer and copies the data for people who don't want the zero
> > > allocation access.
> > >
> > > Sadly in my profiling I noticed that selector implementation in the
> > > JDK allocates
> > > but it seems like projects like Netty, Aeron have worked around this by
> > > using reflection to replace the underlying implementation to make it
> > > non-allocating. Other than that I have absolutely zero allocations past
> > the
> > > initial 4-5 allocations. I also have absolutely zero copies in user
> space
> > > once the data lands from the socket onto the ByteBuffer.
> > >
> > > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <ra...@signalfuse.com>
> > > wrote:
> > >
> > > > I had a few more thoughts on the new API. Currently we use kafka to
> > > > transfer really compact messages - around 25-35 bytes each. Our use
> > case
> > > is
> > > > a lot of messages but each very small. Will it be possible to do the
> > > > following
> > > > to reuse a ConsumerRecord and the ConsumerRecords objects? We employ
> > our
> > > > own binary encoding to encode our messages and it is encoded as a
> > > flyweight
> > > > in SBE/Cap'n'Proto style and can be consumed without any
> > > > decoding/deserialization step. Currently we use the SimpleConsumer
> and
> > > have
> > > > found the fact that it hands out ByteBuffers very useful instead of a
> > > > mandatory deserialization into a POJO step. Even then through memory
> > > > profiling we have found out that the ByteBuffers and the records take
> > > more
> > > > space than the actual messages themselves. Ideally we can allocate a
> > big
> > > > ByteBuffer (maybe kafka already does it) to receive our data and then
> > > just
> > > > get some kind of a flyweight iterator on the ByteBuffer something
> like
> > > this:
> > > >
> > > > // Allocate a single ByteBuffer or have kafka allocate this
> internally.
> > > > Either way it would be very useful to just keep re-using this buffer.
> > > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > > > // Create a coupe of flyweights.
> > > > ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> > > > ConsumerRecord record = new ConsumerRecord();
> > > > // Subscribe to the topics we care about.
> > > > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > > > while (running) {
> > > >   // Use the buffer to get data from the server. Additionally  re-use
> > the
> > > > same iterator.
> > > >   // Resets the iterator to point to the start of the ByteBuffer.
> > > >   consumer.poll(buffer, iterator, timeout);
> > > >   // Now the iterator has pointers to the ByteBuffer and is capable
> of
> > > > advancing the cursor to read every message.
> > > >   while (iterator.hasNext()) {
> > > >     // The consumer record is just a flyweight over a ByteBuffer and
> is
> > > > adjusted to point to the start of the next record.
> > > >     iterator.getNextInto(record);
> > > >     // This is not a new ByteBuffer, its just the big buffer with its
> > > > position and limit adjusted so that we only read the current record.
> > > >     // Alternatively we could give it our own ByteBuffer, but the
> point
> > > > would be to not do a copy and instead adjust the supplied
> > > >     // ByteBuffer's position and limit and pointer (through
> reflection)
> > > to
> > > > point to the right slice of the actual big ByteBuffer.
> > > >     // This means that we cannot stash this buffer in a hash map or
> any
> > > > heap allocated structure since it's contents keep changing as we
> > iterate.
> > > >     ByteBuffer buffer = record.getUnderlying();
> > > >     process(buffer);  // Process cannot keep a reference to the
> buffer
> > -
> > > > this is really the programmer's responsibility.
> > > >   }
> > > > }
> > > >
> > > > Given how the new consumer is meant to be used from a single thread
> - I
> > > > think these optional non-allocating methods will be a great boon for
> > any
> > > > one trying to save memory or prevent heap churn. It has exactly 0
> > copies
> > > in
> > > > user space too which is great for performance. In our case since the
> > > > messages are very tiny we end up spending more memory in all the
> > wrapper
> > > > objects than in the actual messages so this would be a game changer
> for
> > > us.
> > > > I'd love to be able to contribute if this seems sensible. Hopefully
> > these
> > > > non-allocating methods can co-exist with the allocating ones and only
> > > users
> > > > who absolutely need to use them can make the trade-off  of better
> > > > efficiency/performance for a slightly more error-prone and ugly API.
> > > >
> > > > Thoughts?
> > > >
> > > > Thanks,
> > > > Rajiv
> > > >
> > > >
> > > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo
> <alter...@yahoo.com.invalid
> > >
> > > > wrote:
> > > >
> > > >> Hi all,The document is very beautiful and the Kafka release version
> > for
> > > >> this will be? and what is the timeline?
> > > >> ThanksEdwin
> > > >>
> > > >>
> > > >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> > > >> ra...@signalfuse.com> wrote:
> > > >>
> > > >>
> > > >>  Awesome - can't wait for this version to be out!
> > > >>
> > > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <jay.kr...@gmail.com>
> > > wrote:
> > > >>
> > > >> > The timeout in the poll call is more or less the timeout used by
> the
> > > >> > selector. So each call to poll will do socket activity on any
> ready
> > > >> > sockets, waiting for up to that time for a socket to be ready.
> There
> > > is
> > > >> no
> > > >> > longer any background threads involved in the consumer, all
> activity
> > > is
> > > >> > driven by the application thread(s).
> > > >> >
> > > >> > The max fetch request wait time is controlled with a config and is
> > > >> > independent of the time given to poll.
> > > >> >
> > > >> > -Jay
> > > >> >
> > > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <
> > ra...@signalfuse.com>
> > > >> > wrote:
> > > >> >
> > > >> > > I am trying to understand the semantics of the timeout specified
> > in
> > > >> the
> > > >> > > poll method in
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > >> > > .
> > > >> > > Is this timeout a measure of how long the fetch request will be
> > > >> parked on
> > > >> > > the broker waiting for a reply or is this more like the timeout
> in
> > > >> > > selector.select(long timeout) i.e. the method will return with
> > > >> whatever
> > > >> > > data is there after waiting a maximum of timeout. Exposing the
> > > >> selector
> > > >> > > timeout will be very helpful for us because we want to put a
> tight
> > > >> bound
> > > >> > on
> > > >> > > how long we are ready to wait on the poll call. When this API is
> > > >> > available
> > > >> > > we plan to use a single thread to get data from kafka, process
> > them
> > > as
> > > >> > well
> > > >> > > as run periodic jobs. For the periodic jobs to run we need a
> > > >> guarantee on
> > > >> > > how much time the poll call can take at most.
> > > >> > >
> > > >> > > Thanks!
> > > >> > >
> > > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <
> > ra...@signalfuse.com
> > > >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Thanks!
> > > >> > > >
> > > >> > > >
> > > >> > > > On Thursday, March 19, 2015, Jay Kreps <jay.kr...@gmail.com>
> > > wrote:
> > > >> > > >
> > > >> > > >> Err, here:
> > > >> > > >>
> > > >> > > >>
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > >> > > >>
> > > >> > > >> -Jay
> > > >> > > >>
> > > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <
> > jay.kr...@gmail.com>
> > > >> > wrote:
> > > >> > > >>
> > > >> > > >> > The current work in progress is documented here:
> > > >> > > >> >
> > > >> > > >> >
> > > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> > > >> ra...@signalfuse.com
> > > >> > >
> > > >> > > >> > wrote:
> > > >> > > >> >
> > > >> > > >> >> Is there a link to the proposed new consumer non-blocking
> > API?
> > > >> > > >> >>
> > > >> > > >> >> Thanks,
> > > >> > > >> >> Rajiv
> > > >> > > >> >>
> > > >> > > >> >
> > > >> > > >> >
> > > >> > > >>
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >>
> > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to