Hi Rajiv,

Just want to clarify, that the main motivation for iterating over the byte
buffer directly instead of iterating over the records is for not enforcing
de-serialization, right? I think that can be done by passing the
deserializer class info into the consumer record instead of putting the
de-serialized objects into the consumer, and only do de-serialization
on-the-fly when key/value fields are requested while exposing another API
to return the underlying raw bytes from the record. What do you think?

As for de-compression, today we allocate new buffer for each de-compressed
message, and it may be required to do de-compression with re-useable buffer
with memory control. I will create a ticket under KAFKA-1326 for that.

Guozhang

Guozhang



On Sun, Mar 22, 2015 at 1:22 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Hi Guozhang,
>
> Thanks for the note. So if we do not deserialize till the last moment like
> Jay suggested we would not need extra buffers for deserialization. Unless
> we need random access to messages it seems like we can deserialize right at
> the time of iteration and allocate objects only if the Consumer actually
> requires deserialized objects. Some applications like mine can just utilize
> the ByteBuffer contents and in those cases we can take in a ByteBuffer or a
> ConsumerRecord which points to a ByteBuffer and make it point to the right
> slice of our response buffer. This ByteBuffer can be re-used over and over
> again.
>
> As for decompression - that will require an extra buffer allocated at
> startup, but not new allocations per response. I imagine the steps would be
> something like this:
>
> i) Receive entire response which includes possibly compressed messages on
> ResponseBuffer. ResponseBuffer is only allocated once.
> ii) Once we have received an entire response, return an iterator to the
> underlying ByteBuffer.
> iii) On each message set check if it is compressed or not.
> iv) If compressed decompress only that message set in a streaming manner
> and stores the result in another preallocated buffer
> DecompressedMessagesBuffer. When the consumer asks for the next message
> alter the flyweight that it supplies you to point to the correct slice in
> the DecompressedMessagesBuffer. Do this till all decompressed messages from
> that message set are consumed.
> v) If the message set was not compressed, then alter the flyweight to point
> to the correct slice in the ResponseBuffer. Do this till all messages from
> this message set are consumed.
>
> So basically we will switch between the two buffers when messages are
> compressed or otherwise stay only on the ResponseBuffer. With the lazy
> approach another nice thing is we can do CRC validation right before the
> message gets consumed too. Which means that the CRC algorithm will scan
> some bytes which will be hot in cache (since we are iterating a linear
> array of bytes) and as soon as we have checked the bytes they will be
> consumed by the application which means they will stay hot in L1 cache.
>
> All of this "theory crafting" is based on my currently incomplete
> understanding of the kafka protocol but it seems like compression is per
> message set so we can stream though. Also since in general we can iterate
> through the response buffer, we can do CRC validation right before the
> message is consumed.
>
> Thanks,
> Rajiv
>
> On Sun, Mar 22, 2015 at 10:29 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Rajiv,
> >
> > A side note for re-using ByteBuffer: in the new consumer we do plan to
> add
> > some memory management module such that it will try to reuse allocated
> > buffer for fetch responses. But as Jay mentioned, for now inside the
> poll()
> > call de-serialization and de-compression is done which requires to
> allocate
> > another buffer to write the de-serialized and de-compressed bytes, hence
> > even with fetch response buffer management today we still need to
> allocate
> > new buffers if compressed messages are delivered.
> >
> > Guozhang
> >
> > On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >
> > > Zijing, the new consumer will be in the next release. We don't have a
> > hard
> > > date for this yet.
> > >
> > > Rajiv, I'm game if we can show a >= 20% performance improvement. It
> > > certainly could be an improvement, but it might also be that the CRC
> > > validation and compression dominate.
> > >
> > > The first step would be
> > >   https://issues.apache.org/jira/browse/KAFKA-1895
> > >
> > > This would delay the deserialization of ConsumerRecords and make it
> > > basically a wrapper around the actual MemoryRecords chunks which are
> > > basically ByteBuffer instances. We could then add an optional
> > > ConsumerRecords param in poll, to allow you to hand back your
> > > ConsumerRecords instance. Optimizing the ConsumerRecord instance reuse
> > > would also be possible.
> > >
> > > That JIRA is actually useful irrespective of these optimizations though
> > > because deferring the deserialization and decompression would allow you
> > to
> > > punt that work into a pool of processor threads. Since it is more
> common
> > to
> > > see the real bottleneck be application serialization this could be
> > > valuable.
> > >
> > > WRT the object reuse I wouldn't be shocked to learn that you actually
> get
> > > equivalent stuff out of the jvm allocator's own pooling and/or escape
> > > analysis once we are doing the allocation on demand. So it would be
> good
> > to
> > > show a real performance improvement on the newer JVMs before deciding
> to
> > go
> > > this route.
> > >
> > > -Jay
> > >
> > >
> > > On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <ra...@signalfuse.com>
> > > wrote:
> > >
> > > > Just a follow up - I have implemented a pretty hacky prototype It's
> too
> > > > unclean to share right now but I can clean it up if you are
> > interested. I
> > > > don't think it offers anything that people already don't know about
> > > though.
> > > >
> > > > My prototype doesn't do any metadata requests yet but I have a
> > flyweight
> > > > builder/parser of the FetchRequest and the FetchResponse protocol
> that
> > I
> > > > based on the protocol page at
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > > .
> > > > On my consumer thread I allocate two buffers per broker a small
> request
> > > > buffer and a bigger response buffer. My assumption is that the
> response
> > > > buffer will be big enough that it is never too small for the
> response.
> > > If
> > > > this assumption doesn't hold then we would have to reallocate a
> bigger
> > > > buffer. These are the steps I am following right now:
> > > >
> > > > i) Whenever a client makes a request I fill up the relevant request
> > > > buffer(s) using my flyweight object instead of allocating an object
> and
> > > > then serializing it. I then write this buffer to the SocketChannel
> > > > connected to the right Kafka broker. I don't yet handle the case
> where
> > > all
> > > > of the request bytes could not be  written synchronously. If I were
> to
> > > > handle that properly I would need to register WRITE interest on the
> > > > SocketChannel on incomplete writes.
> > > >
> > > > ii) Whenever the client calls poll() I register read interest on the
> > > socket
> > > > channel using the selector and make a select call on the underlying
> > > > selector. If the selector says that the socket channel is readable, I
> > > read
> > > > all the bytes possible into my response ByteBuffer. If I can read the
> > > first
> > > > 4 bytes I know how big the response is. So I wait till all response
> > bytes
> > > > have been read before attempting to parse it. This might take
> multiple
> > > poll
> > > > calls. Once I have received the expected number of bytes I iterate
> > > through
> > > > the response ByteBuffer. There is no random access access provided to
> > the
> > > > ByteBuffer since there are way too many variable length fields. This
> a
> > > > DISADVANTAGE of doing flyweight style parsing instead of
> > deserialization
> > > > into POJOs with indexed data structures like maps.  I could build
> > > indexing
> > > > with a non-allocating primitive hashmap like the koloboke one mapping
> > > > Topic/Partition/Messages to offsets in the ByteBuffer if really
> > required.
> > > > For me the lack of random access has not been a problem at all.
> > Iteration
> > > > is done just by providing a ByteBuffer and offset (within ByteBuffer)
> > > pair
> > > > for each message. In my own application I wrap this ByteBuffer and
> > offset
> > > > pair in my own flyweight which knows how to decode the data.
> > > >
> > > > iii) Once an entire response has been iterated through I can re-use
> > both
> > > > the request as well as response buffers.
> > > >
> > > > I am sure this can be improved upon a lot.  I allocate the following
> > > before
> > > > starting my client:
> > > >     i) A Direct ByteBuffer for requests.
> > > >    ii) A Direct ByteBuffer for responses. Sizes of the ByteBuffers
> are
> > > > chosen so that they can fit the biggest request/responses we expect.
> > > >    ii) A flyweight that wraps the Request ByteBuffer and can be used
> to
> > > > write a particular kind of request.  So far I have only written a
> > > flyweight
> > > > for a FetchRequest.
> > > >   iii) A flyweight that wraps the Response ByteBuffer and can be used
> > to
> > > > iterate through the entire response including finding errors. There
> is
> > no
> > > > random access allowed right now. So far I have only written a
> flyweight
> > > > parser for a FetchResponse.
> > > >   iv) A flyweight for every type of application level message that I
> > > > expect. Iterating through the response ByteBuffer using the flyweight
> > in
> > > > (iii) I get offsets into the ByteBuffer for each individual message.
> My
> > > own
> > > > messages work by using absolute position getLong(position),
> > > > getShort(position) etc calls on a ByteBuffer, so this works out great
> > for
> > > > me. We could alternatively provide an API that does allocates a new
> > > > ByteBuffer and copies the data for people who don't want the zero
> > > > allocation access.
> > > >
> > > > Sadly in my profiling I noticed that selector implementation in the
> > > > JDK allocates
> > > > but it seems like projects like Netty, Aeron have worked around this
> by
> > > > using reflection to replace the underlying implementation to make it
> > > > non-allocating. Other than that I have absolutely zero allocations
> past
> > > the
> > > > initial 4-5 allocations. I also have absolutely zero copies in user
> > space
> > > > once the data lands from the socket onto the ByteBuffer.
> > > >
> > > > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <ra...@signalfuse.com
> >
> > > > wrote:
> > > >
> > > > > I had a few more thoughts on the new API. Currently we use kafka to
> > > > > transfer really compact messages - around 25-35 bytes each. Our use
> > > case
> > > > is
> > > > > a lot of messages but each very small. Will it be possible to do
> the
> > > > > following
> > > > > to reuse a ConsumerRecord and the ConsumerRecords objects? We
> employ
> > > our
> > > > > own binary encoding to encode our messages and it is encoded as a
> > > > flyweight
> > > > > in SBE/Cap'n'Proto style and can be consumed without any
> > > > > decoding/deserialization step. Currently we use the SimpleConsumer
> > and
> > > > have
> > > > > found the fact that it hands out ByteBuffers very useful instead
> of a
> > > > > mandatory deserialization into a POJO step. Even then through
> memory
> > > > > profiling we have found out that the ByteBuffers and the records
> take
> > > > more
> > > > > space than the actual messages themselves. Ideally we can allocate
> a
> > > big
> > > > > ByteBuffer (maybe kafka already does it) to receive our data and
> then
> > > > just
> > > > > get some kind of a flyweight iterator on the ByteBuffer something
> > like
> > > > this:
> > > > >
> > > > > // Allocate a single ByteBuffer or have kafka allocate this
> > internally.
> > > > > Either way it would be very useful to just keep re-using this
> buffer.
> > > > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > > > > // Create a coupe of flyweights.
> > > > > ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> > > > > ConsumerRecord record = new ConsumerRecord();
> > > > > // Subscribe to the topics we care about.
> > > > > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > > > > while (running) {
> > > > >   // Use the buffer to get data from the server. Additionally
> re-use
> > > the
> > > > > same iterator.
> > > > >   // Resets the iterator to point to the start of the ByteBuffer.
> > > > >   consumer.poll(buffer, iterator, timeout);
> > > > >   // Now the iterator has pointers to the ByteBuffer and is capable
> > of
> > > > > advancing the cursor to read every message.
> > > > >   while (iterator.hasNext()) {
> > > > >     // The consumer record is just a flyweight over a ByteBuffer
> and
> > is
> > > > > adjusted to point to the start of the next record.
> > > > >     iterator.getNextInto(record);
> > > > >     // This is not a new ByteBuffer, its just the big buffer with
> its
> > > > > position and limit adjusted so that we only read the current
> record.
> > > > >     // Alternatively we could give it our own ByteBuffer, but the
> > point
> > > > > would be to not do a copy and instead adjust the supplied
> > > > >     // ByteBuffer's position and limit and pointer (through
> > reflection)
> > > > to
> > > > > point to the right slice of the actual big ByteBuffer.
> > > > >     // This means that we cannot stash this buffer in a hash map or
> > any
> > > > > heap allocated structure since it's contents keep changing as we
> > > iterate.
> > > > >     ByteBuffer buffer = record.getUnderlying();
> > > > >     process(buffer);  // Process cannot keep a reference to the
> > buffer
> > > -
> > > > > this is really the programmer's responsibility.
> > > > >   }
> > > > > }
> > > > >
> > > > > Given how the new consumer is meant to be used from a single thread
> > - I
> > > > > think these optional non-allocating methods will be a great boon
> for
> > > any
> > > > > one trying to save memory or prevent heap churn. It has exactly 0
> > > copies
> > > > in
> > > > > user space too which is great for performance. In our case since
> the
> > > > > messages are very tiny we end up spending more memory in all the
> > > wrapper
> > > > > objects than in the actual messages so this would be a game changer
> > for
> > > > us.
> > > > > I'd love to be able to contribute if this seems sensible. Hopefully
> > > these
> > > > > non-allocating methods can co-exist with the allocating ones and
> only
> > > > users
> > > > > who absolutely need to use them can make the trade-off  of better
> > > > > efficiency/performance for a slightly more error-prone and ugly
> API.
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > Thanks,
> > > > > Rajiv
> > > > >
> > > > >
> > > > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo
> > <alter...@yahoo.com.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > >> Hi all,The document is very beautiful and the Kafka release
> version
> > > for
> > > > >> this will be? and what is the timeline?
> > > > >> ThanksEdwin
> > > > >>
> > > > >>
> > > > >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> > > > >> ra...@signalfuse.com> wrote:
> > > > >>
> > > > >>
> > > > >>  Awesome - can't wait for this version to be out!
> > > > >>
> > > > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <jay.kr...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >> > The timeout in the poll call is more or less the timeout used by
> > the
> > > > >> > selector. So each call to poll will do socket activity on any
> > ready
> > > > >> > sockets, waiting for up to that time for a socket to be ready.
> > There
> > > > is
> > > > >> no
> > > > >> > longer any background threads involved in the consumer, all
> > activity
> > > > is
> > > > >> > driven by the application thread(s).
> > > > >> >
> > > > >> > The max fetch request wait time is controlled with a config and
> is
> > > > >> > independent of the time given to poll.
> > > > >> >
> > > > >> > -Jay
> > > > >> >
> > > > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <
> > > ra...@signalfuse.com>
> > > > >> > wrote:
> > > > >> >
> > > > >> > > I am trying to understand the semantics of the timeout
> specified
> > > in
> > > > >> the
> > > > >> > > poll method in
> > > > >> > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > >> > > .
> > > > >> > > Is this timeout a measure of how long the fetch request will
> be
> > > > >> parked on
> > > > >> > > the broker waiting for a reply or is this more like the
> timeout
> > in
> > > > >> > > selector.select(long timeout) i.e. the method will return with
> > > > >> whatever
> > > > >> > > data is there after waiting a maximum of timeout. Exposing the
> > > > >> selector
> > > > >> > > timeout will be very helpful for us because we want to put a
> > tight
> > > > >> bound
> > > > >> > on
> > > > >> > > how long we are ready to wait on the poll call. When this API
> is
> > > > >> > available
> > > > >> > > we plan to use a single thread to get data from kafka, process
> > > them
> > > > as
> > > > >> > well
> > > > >> > > as run periodic jobs. For the periodic jobs to run we need a
> > > > >> guarantee on
> > > > >> > > how much time the poll call can take at most.
> > > > >> > >
> > > > >> > > Thanks!
> > > > >> > >
> > > > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <
> > > ra...@signalfuse.com
> > > > >
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Thanks!
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > On Thursday, March 19, 2015, Jay Kreps <jay.kr...@gmail.com
> >
> > > > wrote:
> > > > >> > > >
> > > > >> > > >> Err, here:
> > > > >> > > >>
> > > > >> > > >>
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > >> > > >>
> > > > >> > > >> -Jay
> > > > >> > > >>
> > > > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <
> > > jay.kr...@gmail.com>
> > > > >> > wrote:
> > > > >> > > >>
> > > > >> > > >> > The current work in progress is documented here:
> > > > >> > > >> >
> > > > >> > > >> >
> > > > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> > > > >> ra...@signalfuse.com
> > > > >> > >
> > > > >> > > >> > wrote:
> > > > >> > > >> >
> > > > >> > > >> >> Is there a link to the proposed new consumer
> non-blocking
> > > API?
> > > > >> > > >> >>
> > > > >> > > >> >> Thanks,
> > > > >> > > >> >> Rajiv
> > > > >> > > >> >>
> > > > >> > > >> >
> > > > >> > > >> >
> > > > >> > > >>
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to