Hi Jens,

Gut feeling is that it's not a trivial patch, but you're more than welcome
to take a shot. We should probably do a KIP also since it's a change to a
public API (even if we only just released it). That's also a good way to
get feedback and make sure we're not missing a better approach. Here's a
link to the KIP guide:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals.
Let me know if I can help out.

-Jason

On Wed, Dec 16, 2015 at 2:07 PM, Jens Rantil <jens.ran...@tink.se> wrote:

> Hi again Jason,
>
> Once again thanks for your answer.
>
> Yes, the more I think/read about this it sounds like the "max records"
> approach is more viable. Without knowing the code, I guess it would make
> more sense to create a "max.partition.fetch.messages" property. That way a
> consumer could optimize for quick fetch on startup instead of per-poll()
> call. And if a consumer really would like to change the number of messages
> realtime, they could simply close the consumer and restart it.
>
> I spent 45 minutes trying to set up a development environment to have a
> look at the Kafka code and maybe submit a pull request for this. Do you
> think this would be hard to implement? Would introducing this need a larger
> consensus/discussion in KAFKA-2986?
>
> Last, but not least, I'm happy to hear that this case is something that
> Kafka should handle. I've reviewed many queueing solutions really seem like
> the absolute best solution to our problem as long we can overcome this
> issue.
>
> Thanks,
> Jens
>
> On Tuesday, December 15, 2015, Jason Gustafson <ja...@confluent.io
> <javascript:_e(%7B%7D,'cvml','ja...@confluent.io');>> wrote:
>
> > I was talking with Jay this afternoon about this use case. The tricky
> thing
> > about adding a ping() or heartbeat() API is that you have to deal with
> the
> > potential for rebalancing. This means either allowing it to block while a
> > rebalance completes or having it raise an exception indicating that a
> > rebalance is needed. In code, the latter might look like this:
> >
> > while (running) {
> >   ConsumerRecords<K, V> records = consumer.poll(1000);
> >   try {
> >     for (ConsumerRecord record : records) {
> >       process(record);
> >       consumer.heartbeat();
> >     }
> >   } catch (RebalanceException e){
> >     continue;
> >   }
> > }
> >
> > Unfortunately, this wouldn't work with auto-commit since it would tend to
> > break message processing early which would let the committed position get
> > ahead of the last offset processed. The alternative blocking approach
> > wouldn't be any better in this regard. Overall, it seems like this might
> > introduce a bigger problem than it solves.
> >
> > Perhaps the simpler solution is to provide a way to set the maximum
> number
> > of messages returned. This could either be a new configuration option or
> a
> > second argument in poll, but it would let you handle messages one-by-one
> if
> > you needed to. You'd then be able to set the session timeout according to
> > the expected time to handle a single message. It'd be a bit more work to
> > implement this, but if the use case is common enough, it might be
> > worthwhile.
> >
> > -Jason
> >
> > On Tue, Dec 15, 2015 at 10:31 AM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hey Jens,
> > >
> > > The purpose of pause() is to stop fetches for a set of partitions. This
> > > lets you continue calling poll() to send heartbeats. Also note that
> > poll()
> > > generally only blocks for rebalances. In code, something like this is
> > what
> > > I was thinking:
> > >
> > > while (running) {
> > >   ConsumerRecords<K, V> records = consumer.poll(1000);
> > >   if (queue.offer(records))
> > >     continue;
> > >
> > >   TopicPartition[] assignment = toArray(consumer.assignment());
> > >   consumer.pause(assignment);
> > >   while (!queue.offer(records, heartbeatIntervalMs,
> > TimeUnit.MILLISECONDS))
> > >     consumer.poll(0);
> > >   consumer.resume(assignment);
> > > }
> > >
> > > The tricky thing is handling rebalances since they might occur in
> either
> > > call to poll(). In a rebalance, you have to 1) drain the queue, 2)
> commit
> > > current offsets, and 3) maybe break from the inner poll loop. If the
> > > processing thread is busy when the rebalance is triggered, then you may
> > > have to discard the results when it's finished. It's also a little
> > > difficult communicating completion to the poll loop, which is where the
> > > offset commit needs to take place. I suppose another queue would work,
> > > sigh.
> > >
> > > Well, I think you can make that work, but I tend to agree that it's
> > pretty
> > > complicated. Perhaps instead of a queue, you should just submit the
> > > processor to an executor service for each record set returned and await
> > its
> > > completion directly. For example:
> > >
> > > while (running) {
> > >   ConsumerRecords<K, V> records = consumer.poll(1000);
> > >   Future future = executor.submit(new Processor(records));
> > >
> > >   TopicPartition[] assignment = toArray(consumer.assignment());
> > >   consumer.pause(assignment);
> > >   while (!complete(future, heartbeatIntervalMs,
> TimeUnit.MILLLISECONDS))
> > >     consumer.poll(0);
> > >   consumer.resume(assignment);
> > >   consumer.commitSync();
> > > }
> > >
> > > This seems closer to the spirit of the poll loop, and it makes handling
> > > commits a lot easier. You still have to deal with the rebalance
> problem,
> > > but at least you don't have to deal with the queue. It's still a little
> > > complex though. Maybe the consumer needs a ping() API which does the
> same
> > > thing as poll() but doesn't send or return any fetches. That would
> > simplify
> > > things a little more:
> > >
> > > while (running) {
> > >   ConsumerRecords<K, V> records = consumer.poll(1000);
> > >   Future future = executor.submit(new Processor(records));
> > >   while (!complete(future, heartbeatIntervalMs,
> TimeUnit.MILLLISECONDS))
> > >     consumer.ping();
> > >   consumer.commitSync();
> > > }
> > >
> > > Anyway, I'll think about it a little more and see if any other
> approaches
> > > come to mind. I do agree that we should have a way to handle this case
> > > without too much extra work.
> > >
> > >
> > > -Jason
> > >
> > >
> > > On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil <jens.ran...@tink.se>
> > wrote:
> > >
> > >> Hi Jason,
> > >>
> > >> Thanks for your response. See replies inline:
> > >>
> > >> On Tuesday, December 15, 2015, Jason Gustafson <ja...@confluent.io>
> > >> wrote:
> > >>
> > >> > Hey Jens,
> > >> >
> > >> > I'm not sure I understand why increasing the session timeout is not
> an
> > >> > option. Is the issue that there's too much uncertainly about
> > processing
> > >> > time to set an upper bound for each round of the poll loop?
> > >> >
> > >>
> > >> Yes, that's the issue.
> > >>
> > >>
> > >> > One general workaround would be to move the processing into another
> > >> thread.
> > >> > For example, you could add messages to a blocking queue and have the
> > >> > processor poll them. We have a pause() API which can be used to
> > prevent
> > >> > additional fetches when the queue is full. This would allow you to
> > >> continue
> > >> > the poll loop instead of blocking in offer(). The tricky thing would
> > be
> > >> > handling rebalances since you'd need to clear the queue and get the
> > last
> > >> > offset from the processors. Enough people probably will end up doing
> > >> > something like this that we should probably work through an example
> on
> > >> how
> > >> > to do it properly.
> > >> >
> > >>
> > >> Hm, as far as I've understood the consumer will only send heartbeats
> to
> > >> the
> > >> broker when poll() is being called. If I would call pause() on a
> > consumer
> > >> (from a separate thread) I understand poll() will block undefinitely.
> > Will
> > >> the polling consumer still send heartbeats when blocked? Or would a
> > pause
> > >> for too long (while my records are being processed) eventually lead to
> > >> session timeout? If the latter, that would sort of defeat the purpose
> > >> since
> > >> I am trying to avoid unnecessary rebalancing of consumers when there
> is
> > >> high pressure on the consumers.
> > >>
> > >> Regarding handling of rebalancing for a queue solution you describe;
> It
> > >> really sounds very complicated. It's probably doable, but doesn't this
> > >> sort
> > >> of defeat the purpose of the high level consumer API? I mean, it
> sounds
> > >> like it should gracefully handle slow consumption of varying size. I
> > might
> > >> be wrong.
> > >>
> > >> Thanks,
> > >> Jens
> > >>
> > >>
> > >> --
> > >> Jens Rantil
> > >> Backend engineer
> > >> Tink AB
> > >>
> > >> Email: jens.ran...@tink.se
> > >> Phone: +46 708 84 18 32
> > >> Web: www.tink.se
> > >>
> > >> Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> > >> <
> > >>
> >
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > >> >
> > >>  Twitter <https://twitter.com/tink>
> > >>
> > >
> > >
> >
>
>
> --
> Jens Rantil
> Backend engineer
> Tink AB
>
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
>
> Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> <
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> >
>  Twitter <https://twitter.com/tink>
>

Reply via email to