Hi Guozhang,

Yeah the main motivation is to not require de-serialization but still allow
the consumer to de-serialize into objects if they really want to. Another
motivation for iterating over the ByteBuffer on the fly is that we can
prevent copies all together. This has an added implication though. Since we
re-use the same response buffer continuously and iterate over it, this
means that the iteration has to be done in a single threaded manner
specifically on the thread that did the poll. The work flow will look a bit
like this:

i) Re-use the same request buffer to create a request and write to the
socket.
ii) On poll re-use the same response buffer to read in the request till it
is complete.
iii) When the response is complete respond with an iterator to the response
ByteBuffer. The consumer must now consume the entire ByteBuffer on this
thread since we use the a single mutable iterator to go through the
ByteBuffer.

It is tricker when we consider that during iteration the consumer might
send more kafka requests and call poll further. We can maintain a pointer
to the end of the response ByteBuffer to allow more responses to stream in
and essentially use the response buffer as a circular buffer. So basically
there would be two pointers - iteration_pointer and
 network_append_pointer. And the iteration_pointer <=
network_append_pointer.  If the network_append_pointer reaches the end of
the buffer, then we cannot accept any more data without the
iteration_pointer having progressed further. Since responses have to be
continuous, we cannot have a response straddle the circular buffer's end.
In this case we'd have to detect this case and copy the response to the
beginning of the buffer so that it is continuous. Though this is a bit more
complicated, it ensures that we don't have to move in lockstep and can
pipeline requests/responses. I have written something like this for another
application and since this is all happening in a single thread it is a bit
easier and I think possible.

Like Jay suggested if application level deserialization is a bottleneck
that needs to be solved by passing slices of these ByteBuffers out to a
pool of threads, then this approach WILL NOT work since we expect the
ByteBuffer to be linearly iterated in one go. If we want slices to be
passed to a pool of threads then probably copying to new ByteBuffers is the
only good option. For my application that is definitely not the case since
deserialization is free and the cache friendliness of iterating over a hot
buffer trumps every other factor. But for applications with more involved
serialization we can re-use the above framework. The more low level
iterator (discussed in paragraph 2 and 3) can be wrapped in a higher level
iterator that just copies the bytes for each message to a new ByteBuffer
and hands them over. The low level iterator is still responsible for buffer
management and the higher level iterator is just a plain old consumer that
consumes the bytes by copying them over to a new ByteBuffer and handing
them to the application. The application is now free to transfer these to
other threads for processing/deserialization.

Thanks for creating KAFKA-1326. Let me know what your thoughts are on this
proposed design?

Thanks,
Rajiv


On Tue, Mar 24, 2015 at 9:22 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Rajiv,
>
> Just want to clarify, that the main motivation for iterating over the byte
> buffer directly instead of iterating over the records is for not enforcing
> de-serialization, right? I think that can be done by passing the
> deserializer class info into the consumer record instead of putting the
> de-serialized objects into the consumer, and only do de-serialization
> on-the-fly when key/value fields are requested while exposing another API
> to return the underlying raw bytes from the record. What do you think?
>
> As for de-compression, today we allocate new buffer for each de-compressed
> message, and it may be required to do de-compression with re-useable buffer
> with memory control. I will create a ticket under KAFKA-1326 for that.
>
> Guozhang
>
> Guozhang
>
>
>
> On Sun, Mar 22, 2015 at 1:22 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for the note. So if we do not deserialize till the last moment
> like
> > Jay suggested we would not need extra buffers for deserialization. Unless
> > we need random access to messages it seems like we can deserialize right
> at
> > the time of iteration and allocate objects only if the Consumer actually
> > requires deserialized objects. Some applications like mine can just
> utilize
> > the ByteBuffer contents and in those cases we can take in a ByteBuffer
> or a
> > ConsumerRecord which points to a ByteBuffer and make it point to the
> right
> > slice of our response buffer. This ByteBuffer can be re-used over and
> over
> > again.
> >
> > As for decompression - that will require an extra buffer allocated at
> > startup, but not new allocations per response. I imagine the steps would
> be
> > something like this:
> >
> > i) Receive entire response which includes possibly compressed messages on
> > ResponseBuffer. ResponseBuffer is only allocated once.
> > ii) Once we have received an entire response, return an iterator to the
> > underlying ByteBuffer.
> > iii) On each message set check if it is compressed or not.
> > iv) If compressed decompress only that message set in a streaming manner
> > and stores the result in another preallocated buffer
> > DecompressedMessagesBuffer. When the consumer asks for the next message
> > alter the flyweight that it supplies you to point to the correct slice in
> > the DecompressedMessagesBuffer. Do this till all decompressed messages
> from
> > that message set are consumed.
> > v) If the message set was not compressed, then alter the flyweight to
> point
> > to the correct slice in the ResponseBuffer. Do this till all messages
> from
> > this message set are consumed.
> >
> > So basically we will switch between the two buffers when messages are
> > compressed or otherwise stay only on the ResponseBuffer. With the lazy
> > approach another nice thing is we can do CRC validation right before the
> > message gets consumed too. Which means that the CRC algorithm will scan
> > some bytes which will be hot in cache (since we are iterating a linear
> > array of bytes) and as soon as we have checked the bytes they will be
> > consumed by the application which means they will stay hot in L1 cache.
> >
> > All of this "theory crafting" is based on my currently incomplete
> > understanding of the kafka protocol but it seems like compression is per
> > message set so we can stream though. Also since in general we can iterate
> > through the response buffer, we can do CRC validation right before the
> > message is consumed.
> >
> > Thanks,
> > Rajiv
> >
> > On Sun, Mar 22, 2015 at 10:29 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > Rajiv,
> > >
> > > A side note for re-using ByteBuffer: in the new consumer we do plan to
> > add
> > > some memory management module such that it will try to reuse allocated
> > > buffer for fetch responses. But as Jay mentioned, for now inside the
> > poll()
> > > call de-serialization and de-compression is done which requires to
> > allocate
> > > another buffer to write the de-serialized and de-compressed bytes,
> hence
> > > even with fetch response buffer management today we still need to
> > allocate
> > > new buffers if compressed messages are delivered.
> > >
> > > Guozhang
> > >
> > > On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> > >
> > > > Zijing, the new consumer will be in the next release. We don't have a
> > > hard
> > > > date for this yet.
> > > >
> > > > Rajiv, I'm game if we can show a >= 20% performance improvement. It
> > > > certainly could be an improvement, but it might also be that the CRC
> > > > validation and compression dominate.
> > > >
> > > > The first step would be
> > > >   https://issues.apache.org/jira/browse/KAFKA-1895
> > > >
> > > > This would delay the deserialization of ConsumerRecords and make it
> > > > basically a wrapper around the actual MemoryRecords chunks which are
> > > > basically ByteBuffer instances. We could then add an optional
> > > > ConsumerRecords param in poll, to allow you to hand back your
> > > > ConsumerRecords instance. Optimizing the ConsumerRecord instance
> reuse
> > > > would also be possible.
> > > >
> > > > That JIRA is actually useful irrespective of these optimizations
> though
> > > > because deferring the deserialization and decompression would allow
> you
> > > to
> > > > punt that work into a pool of processor threads. Since it is more
> > common
> > > to
> > > > see the real bottleneck be application serialization this could be
> > > > valuable.
> > > >
> > > > WRT the object reuse I wouldn't be shocked to learn that you actually
> > get
> > > > equivalent stuff out of the jvm allocator's own pooling and/or escape
> > > > analysis once we are doing the allocation on demand. So it would be
> > good
> > > to
> > > > show a real performance improvement on the newer JVMs before deciding
> > to
> > > go
> > > > this route.
> > > >
> > > > -Jay
> > > >
> > > >
> > > > On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <ra...@signalfuse.com>
> > > > wrote:
> > > >
> > > > > Just a follow up - I have implemented a pretty hacky prototype It's
> > too
> > > > > unclean to share right now but I can clean it up if you are
> > > interested. I
> > > > > don't think it offers anything that people already don't know about
> > > > though.
> > > > >
> > > > > My prototype doesn't do any metadata requests yet but I have a
> > > flyweight
> > > > > builder/parser of the FetchRequest and the FetchResponse protocol
> > that
> > > I
> > > > > based on the protocol page at
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > > > .
> > > > > On my consumer thread I allocate two buffers per broker a small
> > request
> > > > > buffer and a bigger response buffer. My assumption is that the
> > response
> > > > > buffer will be big enough that it is never too small for the
> > response.
> > > > If
> > > > > this assumption doesn't hold then we would have to reallocate a
> > bigger
> > > > > buffer. These are the steps I am following right now:
> > > > >
> > > > > i) Whenever a client makes a request I fill up the relevant request
> > > > > buffer(s) using my flyweight object instead of allocating an object
> > and
> > > > > then serializing it. I then write this buffer to the SocketChannel
> > > > > connected to the right Kafka broker. I don't yet handle the case
> > where
> > > > all
> > > > > of the request bytes could not be  written synchronously. If I were
> > to
> > > > > handle that properly I would need to register WRITE interest on the
> > > > > SocketChannel on incomplete writes.
> > > > >
> > > > > ii) Whenever the client calls poll() I register read interest on
> the
> > > > socket
> > > > > channel using the selector and make a select call on the underlying
> > > > > selector. If the selector says that the socket channel is
> readable, I
> > > > read
> > > > > all the bytes possible into my response ByteBuffer. If I can read
> the
> > > > first
> > > > > 4 bytes I know how big the response is. So I wait till all response
> > > bytes
> > > > > have been read before attempting to parse it. This might take
> > multiple
> > > > poll
> > > > > calls. Once I have received the expected number of bytes I iterate
> > > > through
> > > > > the response ByteBuffer. There is no random access access provided
> to
> > > the
> > > > > ByteBuffer since there are way too many variable length fields.
> This
> > a
> > > > > DISADVANTAGE of doing flyweight style parsing instead of
> > > deserialization
> > > > > into POJOs with indexed data structures like maps.  I could build
> > > > indexing
> > > > > with a non-allocating primitive hashmap like the koloboke one
> mapping
> > > > > Topic/Partition/Messages to offsets in the ByteBuffer if really
> > > required.
> > > > > For me the lack of random access has not been a problem at all.
> > > Iteration
> > > > > is done just by providing a ByteBuffer and offset (within
> ByteBuffer)
> > > > pair
> > > > > for each message. In my own application I wrap this ByteBuffer and
> > > offset
> > > > > pair in my own flyweight which knows how to decode the data.
> > > > >
> > > > > iii) Once an entire response has been iterated through I can re-use
> > > both
> > > > > the request as well as response buffers.
> > > > >
> > > > > I am sure this can be improved upon a lot.  I allocate the
> following
> > > > before
> > > > > starting my client:
> > > > >     i) A Direct ByteBuffer for requests.
> > > > >    ii) A Direct ByteBuffer for responses. Sizes of the ByteBuffers
> > are
> > > > > chosen so that they can fit the biggest request/responses we
> expect.
> > > > >    ii) A flyweight that wraps the Request ByteBuffer and can be
> used
> > to
> > > > > write a particular kind of request.  So far I have only written a
> > > > flyweight
> > > > > for a FetchRequest.
> > > > >   iii) A flyweight that wraps the Response ByteBuffer and can be
> used
> > > to
> > > > > iterate through the entire response including finding errors. There
> > is
> > > no
> > > > > random access allowed right now. So far I have only written a
> > flyweight
> > > > > parser for a FetchResponse.
> > > > >   iv) A flyweight for every type of application level message that
> I
> > > > > expect. Iterating through the response ByteBuffer using the
> flyweight
> > > in
> > > > > (iii) I get offsets into the ByteBuffer for each individual
> message.
> > My
> > > > own
> > > > > messages work by using absolute position getLong(position),
> > > > > getShort(position) etc calls on a ByteBuffer, so this works out
> great
> > > for
> > > > > me. We could alternatively provide an API that does allocates a new
> > > > > ByteBuffer and copies the data for people who don't want the zero
> > > > > allocation access.
> > > > >
> > > > > Sadly in my profiling I noticed that selector implementation in the
> > > > > JDK allocates
> > > > > but it seems like projects like Netty, Aeron have worked around
> this
> > by
> > > > > using reflection to replace the underlying implementation to make
> it
> > > > > non-allocating. Other than that I have absolutely zero allocations
> > past
> > > > the
> > > > > initial 4-5 allocations. I also have absolutely zero copies in user
> > > space
> > > > > once the data lands from the socket onto the ByteBuffer.
> > > > >
> > > > > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <
> ra...@signalfuse.com
> > >
> > > > > wrote:
> > > > >
> > > > > > I had a few more thoughts on the new API. Currently we use kafka
> to
> > > > > > transfer really compact messages - around 25-35 bytes each. Our
> use
> > > > case
> > > > > is
> > > > > > a lot of messages but each very small. Will it be possible to do
> > the
> > > > > > following
> > > > > > to reuse a ConsumerRecord and the ConsumerRecords objects? We
> > employ
> > > > our
> > > > > > own binary encoding to encode our messages and it is encoded as a
> > > > > flyweight
> > > > > > in SBE/Cap'n'Proto style and can be consumed without any
> > > > > > decoding/deserialization step. Currently we use the
> SimpleConsumer
> > > and
> > > > > have
> > > > > > found the fact that it hands out ByteBuffers very useful instead
> > of a
> > > > > > mandatory deserialization into a POJO step. Even then through
> > memory
> > > > > > profiling we have found out that the ByteBuffers and the records
> > take
> > > > > more
> > > > > > space than the actual messages themselves. Ideally we can
> allocate
> > a
> > > > big
> > > > > > ByteBuffer (maybe kafka already does it) to receive our data and
> > then
> > > > > just
> > > > > > get some kind of a flyweight iterator on the ByteBuffer something
> > > like
> > > > > this:
> > > > > >
> > > > > > // Allocate a single ByteBuffer or have kafka allocate this
> > > internally.
> > > > > > Either way it would be very useful to just keep re-using this
> > buffer.
> > > > > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > > > > > // Create a coupe of flyweights.
> > > > > > ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> > > > > > ConsumerRecord record = new ConsumerRecord();
> > > > > > // Subscribe to the topics we care about.
> > > > > > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > > > > > while (running) {
> > > > > >   // Use the buffer to get data from the server. Additionally
> > re-use
> > > > the
> > > > > > same iterator.
> > > > > >   // Resets the iterator to point to the start of the ByteBuffer.
> > > > > >   consumer.poll(buffer, iterator, timeout);
> > > > > >   // Now the iterator has pointers to the ByteBuffer and is
> capable
> > > of
> > > > > > advancing the cursor to read every message.
> > > > > >   while (iterator.hasNext()) {
> > > > > >     // The consumer record is just a flyweight over a ByteBuffer
> > and
> > > is
> > > > > > adjusted to point to the start of the next record.
> > > > > >     iterator.getNextInto(record);
> > > > > >     // This is not a new ByteBuffer, its just the big buffer with
> > its
> > > > > > position and limit adjusted so that we only read the current
> > record.
> > > > > >     // Alternatively we could give it our own ByteBuffer, but the
> > > point
> > > > > > would be to not do a copy and instead adjust the supplied
> > > > > >     // ByteBuffer's position and limit and pointer (through
> > > reflection)
> > > > > to
> > > > > > point to the right slice of the actual big ByteBuffer.
> > > > > >     // This means that we cannot stash this buffer in a hash map
> or
> > > any
> > > > > > heap allocated structure since it's contents keep changing as we
> > > > iterate.
> > > > > >     ByteBuffer buffer = record.getUnderlying();
> > > > > >     process(buffer);  // Process cannot keep a reference to the
> > > buffer
> > > > -
> > > > > > this is really the programmer's responsibility.
> > > > > >   }
> > > > > > }
> > > > > >
> > > > > > Given how the new consumer is meant to be used from a single
> thread
> > > - I
> > > > > > think these optional non-allocating methods will be a great boon
> > for
> > > > any
> > > > > > one trying to save memory or prevent heap churn. It has exactly 0
> > > > copies
> > > > > in
> > > > > > user space too which is great for performance. In our case since
> > the
> > > > > > messages are very tiny we end up spending more memory in all the
> > > > wrapper
> > > > > > objects than in the actual messages so this would be a game
> changer
> > > for
> > > > > us.
> > > > > > I'd love to be able to contribute if this seems sensible.
> Hopefully
> > > > these
> > > > > > non-allocating methods can co-exist with the allocating ones and
> > only
> > > > > users
> > > > > > who absolutely need to use them can make the trade-off  of better
> > > > > > efficiency/performance for a slightly more error-prone and ugly
> > API.
> > > > > >
> > > > > > Thoughts?
> > > > > >
> > > > > > Thanks,
> > > > > > Rajiv
> > > > > >
> > > > > >
> > > > > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo
> > > <alter...@yahoo.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Hi all,The document is very beautiful and the Kafka release
> > version
> > > > for
> > > > > >> this will be? and what is the timeline?
> > > > > >> ThanksEdwin
> > > > > >>
> > > > > >>
> > > > > >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> > > > > >> ra...@signalfuse.com> wrote:
> > > > > >>
> > > > > >>
> > > > > >>  Awesome - can't wait for this version to be out!
> > > > > >>
> > > > > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <
> jay.kr...@gmail.com>
> > > > > wrote:
> > > > > >>
> > > > > >> > The timeout in the poll call is more or less the timeout used
> by
> > > the
> > > > > >> > selector. So each call to poll will do socket activity on any
> > > ready
> > > > > >> > sockets, waiting for up to that time for a socket to be ready.
> > > There
> > > > > is
> > > > > >> no
> > > > > >> > longer any background threads involved in the consumer, all
> > > activity
> > > > > is
> > > > > >> > driven by the application thread(s).
> > > > > >> >
> > > > > >> > The max fetch request wait time is controlled with a config
> and
> > is
> > > > > >> > independent of the time given to poll.
> > > > > >> >
> > > > > >> > -Jay
> > > > > >> >
> > > > > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <
> > > > ra...@signalfuse.com>
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> > > I am trying to understand the semantics of the timeout
> > specified
> > > > in
> > > > > >> the
> > > > > >> > > poll method in
> > > > > >> > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > >> > > .
> > > > > >> > > Is this timeout a measure of how long the fetch request will
> > be
> > > > > >> parked on
> > > > > >> > > the broker waiting for a reply or is this more like the
> > timeout
> > > in
> > > > > >> > > selector.select(long timeout) i.e. the method will return
> with
> > > > > >> whatever
> > > > > >> > > data is there after waiting a maximum of timeout. Exposing
> the
> > > > > >> selector
> > > > > >> > > timeout will be very helpful for us because we want to put a
> > > tight
> > > > > >> bound
> > > > > >> > on
> > > > > >> > > how long we are ready to wait on the poll call. When this
> API
> > is
> > > > > >> > available
> > > > > >> > > we plan to use a single thread to get data from kafka,
> process
> > > > them
> > > > > as
> > > > > >> > well
> > > > > >> > > as run periodic jobs. For the periodic jobs to run we need a
> > > > > >> guarantee on
> > > > > >> > > how much time the poll call can take at most.
> > > > > >> > >
> > > > > >> > > Thanks!
> > > > > >> > >
> > > > > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <
> > > > ra...@signalfuse.com
> > > > > >
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > > > Thanks!
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > On Thursday, March 19, 2015, Jay Kreps <
> jay.kr...@gmail.com
> > >
> > > > > wrote:
> > > > > >> > > >
> > > > > >> > > >> Err, here:
> > > > > >> > > >>
> > > > > >> > > >>
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > >> > > >>
> > > > > >> > > >> -Jay
> > > > > >> > > >>
> > > > > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <
> > > > jay.kr...@gmail.com>
> > > > > >> > wrote:
> > > > > >> > > >>
> > > > > >> > > >> > The current work in progress is documented here:
> > > > > >> > > >> >
> > > > > >> > > >> >
> > > > > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> > > > > >> ra...@signalfuse.com
> > > > > >> > >
> > > > > >> > > >> > wrote:
> > > > > >> > > >> >
> > > > > >> > > >> >> Is there a link to the proposed new consumer
> > non-blocking
> > > > API?
> > > > > >> > > >> >>
> > > > > >> > > >> >> Thanks,
> > > > > >> > > >> >> Rajiv
> > > > > >> > > >> >>
> > > > > >> > > >> >
> > > > > >> > > >> >
> > > > > >> > > >>
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to