Zijing, the new consumer will be in the next release. We don't have a hard
date for this yet.

Rajiv, I'm game if we can show a >= 20% performance improvement. It
certainly could be an improvement, but it might also be that the CRC
validation and compression dominate.

The first step would be
  https://issues.apache.org/jira/browse/KAFKA-1895

This would delay the deserialization of ConsumerRecords and make it
basically a wrapper around the actual MemoryRecords chunks which are
basically ByteBuffer instances. We could then add an optional
ConsumerRecords param in poll, to allow you to hand back your
ConsumerRecords instance. Optimizing the ConsumerRecord instance reuse
would also be possible.

That JIRA is actually useful irrespective of these optimizations though
because deferring the deserialization and decompression would allow you to
punt that work into a pool of processor threads. Since it is more common to
see the real bottleneck be application serialization this could be valuable.

WRT the object reuse I wouldn't be shocked to learn that you actually get
equivalent stuff out of the jvm allocator's own pooling and/or escape
analysis once we are doing the allocation on demand. So it would be good to
show a real performance improvement on the newer JVMs before deciding to go
this route.

-Jay


On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Just a follow up - I have implemented a pretty hacky prototype It's too
> unclean to share right now but I can clean it up if you are interested. I
> don't think it offers anything that people already don't know about though.
>
> My prototype doesn't do any metadata requests yet but I have a flyweight
> builder/parser of the FetchRequest and the FetchResponse protocol that I
> based on the protocol page at
>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> .
> On my consumer thread I allocate two buffers per broker a small request
> buffer and a bigger response buffer. My assumption is that the response
> buffer will be big enough that it is never too small for the response.  If
> this assumption doesn't hold then we would have to reallocate a bigger
> buffer. These are the steps I am following right now:
>
> i) Whenever a client makes a request I fill up the relevant request
> buffer(s) using my flyweight object instead of allocating an object and
> then serializing it. I then write this buffer to the SocketChannel
> connected to the right Kafka broker. I don't yet handle the case where all
> of the request bytes could not be  written synchronously. If I were to
> handle that properly I would need to register WRITE interest on the
> SocketChannel on incomplete writes.
>
> ii) Whenever the client calls poll() I register read interest on the socket
> channel using the selector and make a select call on the underlying
> selector. If the selector says that the socket channel is readable, I read
> all the bytes possible into my response ByteBuffer. If I can read the first
> 4 bytes I know how big the response is. So I wait till all response bytes
> have been read before attempting to parse it. This might take multiple poll
> calls. Once I have received the expected number of bytes I iterate through
> the response ByteBuffer. There is no random access access provided to the
> ByteBuffer since there are way too many variable length fields. This a
> DISADVANTAGE of doing flyweight style parsing instead of deserialization
> into POJOs with indexed data structures like maps.  I could build indexing
> with a non-allocating primitive hashmap like the koloboke one mapping
> Topic/Partition/Messages to offsets in the ByteBuffer if really required.
> For me the lack of random access has not been a problem at all. Iteration
> is done just by providing a ByteBuffer and offset (within ByteBuffer) pair
> for each message. In my own application I wrap this ByteBuffer and offset
> pair in my own flyweight which knows how to decode the data.
>
> iii) Once an entire response has been iterated through I can re-use both
> the request as well as response buffers.
>
> I am sure this can be improved upon a lot.  I allocate the following before
> starting my client:
>     i) A Direct ByteBuffer for requests.
>    ii) A Direct ByteBuffer for responses. Sizes of the ByteBuffers are
> chosen so that they can fit the biggest request/responses we expect.
>    ii) A flyweight that wraps the Request ByteBuffer and can be used to
> write a particular kind of request.  So far I have only written a flyweight
> for a FetchRequest.
>   iii) A flyweight that wraps the Response ByteBuffer and can be used to
> iterate through the entire response including finding errors. There is no
> random access allowed right now. So far I have only written a flyweight
> parser for a FetchResponse.
>   iv) A flyweight for every type of application level message that I
> expect. Iterating through the response ByteBuffer using the flyweight in
> (iii) I get offsets into the ByteBuffer for each individual message. My own
> messages work by using absolute position getLong(position),
> getShort(position) etc calls on a ByteBuffer, so this works out great for
> me. We could alternatively provide an API that does allocates a new
> ByteBuffer and copies the data for people who don't want the zero
> allocation access.
>
> Sadly in my profiling I noticed that selector implementation in the
> JDK allocates
> but it seems like projects like Netty, Aeron have worked around this by
> using reflection to replace the underlying implementation to make it
> non-allocating. Other than that I have absolutely zero allocations past the
> initial 4-5 allocations. I also have absolutely zero copies in user space
> once the data lands from the socket onto the ByteBuffer.
>
> On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > I had a few more thoughts on the new API. Currently we use kafka to
> > transfer really compact messages - around 25-35 bytes each. Our use case
> is
> > a lot of messages but each very small. Will it be possible to do the
> > following
> > to reuse a ConsumerRecord and the ConsumerRecords objects? We employ our
> > own binary encoding to encode our messages and it is encoded as a
> flyweight
> > in SBE/Cap'n'Proto style and can be consumed without any
> > decoding/deserialization step. Currently we use the SimpleConsumer and
> have
> > found the fact that it hands out ByteBuffers very useful instead of a
> > mandatory deserialization into a POJO step. Even then through memory
> > profiling we have found out that the ByteBuffers and the records take
> more
> > space than the actual messages themselves. Ideally we can allocate a big
> > ByteBuffer (maybe kafka already does it) to receive our data and then
> just
> > get some kind of a flyweight iterator on the ByteBuffer something like
> this:
> >
> > // Allocate a single ByteBuffer or have kafka allocate this internally.
> > Either way it would be very useful to just keep re-using this buffer.
> > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > // Create a coupe of flyweights.
> > ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> > ConsumerRecord record = new ConsumerRecord();
> > // Subscribe to the topics we care about.
> > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > while (running) {
> >   // Use the buffer to get data from the server. Additionally  re-use the
> > same iterator.
> >   // Resets the iterator to point to the start of the ByteBuffer.
> >   consumer.poll(buffer, iterator, timeout);
> >   // Now the iterator has pointers to the ByteBuffer and is capable of
> > advancing the cursor to read every message.
> >   while (iterator.hasNext()) {
> >     // The consumer record is just a flyweight over a ByteBuffer and is
> > adjusted to point to the start of the next record.
> >     iterator.getNextInto(record);
> >     // This is not a new ByteBuffer, its just the big buffer with its
> > position and limit adjusted so that we only read the current record.
> >     // Alternatively we could give it our own ByteBuffer, but the point
> > would be to not do a copy and instead adjust the supplied
> >     // ByteBuffer's position and limit and pointer (through reflection)
> to
> > point to the right slice of the actual big ByteBuffer.
> >     // This means that we cannot stash this buffer in a hash map or any
> > heap allocated structure since it's contents keep changing as we iterate.
> >     ByteBuffer buffer = record.getUnderlying();
> >     process(buffer);  // Process cannot keep a reference to the buffer -
> > this is really the programmer's responsibility.
> >   }
> > }
> >
> > Given how the new consumer is meant to be used from a single thread - I
> > think these optional non-allocating methods will be a great boon for any
> > one trying to save memory or prevent heap churn. It has exactly 0 copies
> in
> > user space too which is great for performance. In our case since the
> > messages are very tiny we end up spending more memory in all the wrapper
> > objects than in the actual messages so this would be a game changer for
> us.
> > I'd love to be able to contribute if this seems sensible. Hopefully these
> > non-allocating methods can co-exist with the allocating ones and only
> users
> > who absolutely need to use them can make the trade-off  of better
> > efficiency/performance for a slightly more error-prone and ugly API.
> >
> > Thoughts?
> >
> > Thanks,
> > Rajiv
> >
> >
> > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo <alter...@yahoo.com.invalid>
> > wrote:
> >
> >> Hi all,The document is very beautiful and the Kafka release version for
> >> this will be? and what is the timeline?
> >> ThanksEdwin
> >>
> >>
> >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> >> ra...@signalfuse.com> wrote:
> >>
> >>
> >>  Awesome - can't wait for this version to be out!
> >>
> >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> >>
> >> > The timeout in the poll call is more or less the timeout used by the
> >> > selector. So each call to poll will do socket activity on any ready
> >> > sockets, waiting for up to that time for a socket to be ready. There
> is
> >> no
> >> > longer any background threads involved in the consumer, all activity
> is
> >> > driven by the application thread(s).
> >> >
> >> > The max fetch request wait time is controlled with a config and is
> >> > independent of the time given to poll.
> >> >
> >> > -Jay
> >> >
> >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <ra...@signalfuse.com>
> >> > wrote:
> >> >
> >> > > I am trying to understand the semantics of the timeout specified in
> >> the
> >> > > poll method in
> >> > >
> >> > >
> >> >
> >>
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> >> > > .
> >> > > Is this timeout a measure of how long the fetch request will be
> >> parked on
> >> > > the broker waiting for a reply or is this more like the timeout in
> >> > > selector.select(long timeout) i.e. the method will return with
> >> whatever
> >> > > data is there after waiting a maximum of timeout. Exposing the
> >> selector
> >> > > timeout will be very helpful for us because we want to put a tight
> >> bound
> >> > on
> >> > > how long we are ready to wait on the poll call. When this API is
> >> > available
> >> > > we plan to use a single thread to get data from kafka, process them
> as
> >> > well
> >> > > as run periodic jobs. For the periodic jobs to run we need a
> >> guarantee on
> >> > > how much time the poll call can take at most.
> >> > >
> >> > > Thanks!
> >> > >
> >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <ra...@signalfuse.com
> >
> >> > > wrote:
> >> > >
> >> > > > Thanks!
> >> > > >
> >> > > >
> >> > > > On Thursday, March 19, 2015, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> >> > > >
> >> > > >> Err, here:
> >> > > >>
> >> > > >>
> >> > >
> >> >
> >>
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> >> > > >>
> >> > > >> -Jay
> >> > > >>
> >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <jay.kr...@gmail.com>
> >> > wrote:
> >> > > >>
> >> > > >> > The current work in progress is documented here:
> >> > > >> >
> >> > > >> >
> >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> >> ra...@signalfuse.com
> >> > >
> >> > > >> > wrote:
> >> > > >> >
> >> > > >> >> Is there a link to the proposed new consumer non-blocking API?
> >> > > >> >>
> >> > > >> >> Thanks,
> >> > > >> >> Rajiv
> >> > > >> >>
> >> > > >> >
> >> > > >> >
> >> > > >>
> >> > > >
> >> > >
> >> >
> >>
> >>
> >>
> >>
> >
> >
>

Reply via email to