I had a few more thoughts on the new API. Currently we use kafka to
transfer really compact messages - around 25-35 bytes each. Our use case is
a lot of messages but each very small. Will it be possible to do the
following
to reuse a ConsumerRecord and the ConsumerRecords objects? We employ our
own binary encoding to encode our messages and it is encoded as a flyweight
in SBE/Cap'n'Proto style and can be consumed without any
decoding/deserialization step. Currently we use the SimpleConsumer and have
found the fact that it hands out ByteBuffers very useful instead of a
mandatory deserialization into a POJO step. Even then through memory
profiling we have found out that the ByteBuffers and the records take more
space than the actual messages themselves. Ideally we can allocate a big
ByteBuffer (maybe kafka already does it) to receive our data and then just
get some kind of a flyweight iterator on the ByteBuffer something like this:

// Allocate a single ByteBuffer or have kafka allocate this internally.
Either way it would be very useful to just keep re-using this buffer.
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
// Create a coupe of flyweights.
ConsumerRecordIterator iterator = new ConsumerRecordIterator();
ConsumerRecord record = new ConsumerRecord();
// Subscribe to the topics we care about.
consumer.subscribe(TopicPartitions_I_am_interested_in);
while (running) {
  // Use the buffer to get data from the server. Additionally  re-use the
same iterator.
  // Resets the iterator to point to the start of the ByteBuffer.
  consumer.poll(buffer, iterator, timeout);
  // Now the iterator has pointers to the ByteBuffer and is capable of
advancing the cursor to read every message.
  while (iterator.hasNext()) {
    // The consumer record is just a flyweight over a ByteBuffer and is
adjusted to point to the start of the next record.
    iterator.getNextInto(record);
    // This is not a new ByteBuffer, its just the big buffer with its
position and limit adjusted so that we only read the current record.
    // Alternatively we could give it our own ByteBuffer, but the point
would be to not do a copy and instead adjust the supplied
    // ByteBuffer's position and limit and pointer (through reflection) to
point to the right slice of the actual big ByteBuffer.
    // This means that we cannot stash this buffer in a hash map or any
heap allocated structure since it's contents keep changing as we iterate.
    ByteBuffer buffer = record.getUnderlying();
    process(buffer);  // Process cannot keep a reference to the buffer -
this is really the programmer's responsibility.
  }
}

Given how the new consumer is meant to be used from a single thread - I
think these optional non-allocating methods will be a great boon for any
one trying to save memory or prevent heap churn. It has exactly 0 copies in
user space too which is great for performance. In our case since the
messages are very tiny we end up spending more memory in all the wrapper
objects than in the actual messages so this would be a game changer for us.
I'd love to be able to contribute if this seems sensible. Hopefully these
non-allocating methods can co-exist with the allocating ones and only users
who absolutely need to use them can make the trade-off  of better
efficiency/performance for a slightly more error-prone and ugly API.

Thoughts?

Thanks,
Rajiv


On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo <alter...@yahoo.com.invalid>
wrote:

> Hi all,The document is very beautiful and the Kafka release version for
> this will be? and what is the timeline?
> ThanksEdwin
>
>
>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
>
>  Awesome - can't wait for this version to be out!
>
> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > The timeout in the poll call is more or less the timeout used by the
> > selector. So each call to poll will do socket activity on any ready
> > sockets, waiting for up to that time for a socket to be ready. There is
> no
> > longer any background threads involved in the consumer, all activity is
> > driven by the application thread(s).
> >
> > The max fetch request wait time is controlled with a config and is
> > independent of the time given to poll.
> >
> > -Jay
> >
> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > I am trying to understand the semantics of the timeout specified in the
> > > poll method in
> > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > .
> > > Is this timeout a measure of how long the fetch request will be parked
> on
> > > the broker waiting for a reply or is this more like the timeout in
> > > selector.select(long timeout) i.e. the method will return with whatever
> > > data is there after waiting a maximum of timeout. Exposing the selector
> > > timeout will be very helpful for us because we want to put a tight
> bound
> > on
> > > how long we are ready to wait on the poll call. When this API is
> > available
> > > we plan to use a single thread to get data from kafka, process them as
> > well
> > > as run periodic jobs. For the periodic jobs to run we need a guarantee
> on
> > > how much time the poll call can take at most.
> > >
> > > Thanks!
> > >
> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <ra...@signalfuse.com>
> > > wrote:
> > >
> > > > Thanks!
> > > >
> > > >
> > > > On Thursday, March 19, 2015, Jay Kreps <jay.kr...@gmail.com> wrote:
> > > >
> > > >> Err, here:
> > > >>
> > > >>
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > >>
> > > >> -Jay
> > > >>
> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <jay.kr...@gmail.com>
> > wrote:
> > > >>
> > > >> > The current work in progress is documented here:
> > > >> >
> > > >> >
> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> ra...@signalfuse.com
> > >
> > > >> > wrote:
> > > >> >
> > > >> >> Is there a link to the proposed new consumer non-blocking API?
> > > >> >>
> > > >> >> Thanks,
> > > >> >> Rajiv
> > > >> >>
> > > >> >
> > > >> >
> > > >>
> > > >
> > >
> >
>
>
>
>

Reply via email to