Hey Jens,

The purpose of pause() is to stop fetches for a set of partitions. This
lets you continue calling poll() to send heartbeats. Also note that poll()
generally only blocks for rebalances. In code, something like this is what
I was thinking:

while (running) {
  ConsumerRecords<K, V> records = consumer.poll(1000);
  if (queue.offer(records))
    continue;

  TopicPartition[] assignment = toArray(consumer.assignment());
  consumer.pause(assignment);
  while (!queue.offer(records, heartbeatIntervalMs, TimeUnit.MILLISECONDS))
    consumer.poll(0);
  consumer.resume(assignment);
}

The tricky thing is handling rebalances since they might occur in either
call to poll(). In a rebalance, you have to 1) drain the queue, 2) commit
current offsets, and 3) maybe break from the inner poll loop. If the
processing thread is busy when the rebalance is triggered, then you may
have to discard the results when it's finished. It's also a little
difficult communicating completion to the poll loop, which is where the
offset commit needs to take place. I suppose another queue would work,
sigh.

Well, I think you can make that work, but I tend to agree that it's pretty
complicated. Perhaps instead of a queue, you should just submit the
processor to an executor service for each record set returned and await its
completion directly. For example:

while (running) {
  ConsumerRecords<K, V> records = consumer.poll(1000);
  Future future = executor.submit(new Processor(records));

  TopicPartition[] assignment = toArray(consumer.assignment());
  consumer.pause(assignment);
  while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
    consumer.poll(0);
  consumer.resume(assignment);
  consumer.commitSync();
}

This seems closer to the spirit of the poll loop, and it makes handling
commits a lot easier. You still have to deal with the rebalance problem,
but at least you don't have to deal with the queue. It's still a little
complex though. Maybe the consumer needs a ping() API which does the same
thing as poll() but doesn't send or return any fetches. That would simplify
things a little more:

while (running) {
  ConsumerRecords<K, V> records = consumer.poll(1000);
  Future future = executor.submit(new Processor(records));
  while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
    consumer.ping();
  consumer.commitSync();
}

Anyway, I'll think about it a little more and see if any other approaches
come to mind. I do agree that we should have a way to handle this case
without too much extra work.


-Jason


On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil <jens.ran...@tink.se> wrote:

> Hi Jason,
>
> Thanks for your response. See replies inline:
>
> On Tuesday, December 15, 2015, Jason Gustafson <ja...@confluent.io> wrote:
>
> > Hey Jens,
> >
> > I'm not sure I understand why increasing the session timeout is not an
> > option. Is the issue that there's too much uncertainly about processing
> > time to set an upper bound for each round of the poll loop?
> >
>
> Yes, that's the issue.
>
>
> > One general workaround would be to move the processing into another
> thread.
> > For example, you could add messages to a blocking queue and have the
> > processor poll them. We have a pause() API which can be used to prevent
> > additional fetches when the queue is full. This would allow you to
> continue
> > the poll loop instead of blocking in offer(). The tricky thing would be
> > handling rebalances since you'd need to clear the queue and get the last
> > offset from the processors. Enough people probably will end up doing
> > something like this that we should probably work through an example on
> how
> > to do it properly.
> >
>
> Hm, as far as I've understood the consumer will only send heartbeats to the
> broker when poll() is being called. If I would call pause() on a consumer
> (from a separate thread) I understand poll() will block undefinitely. Will
> the polling consumer still send heartbeats when blocked? Or would a pause
> for too long (while my records are being processed) eventually lead to
> session timeout? If the latter, that would sort of defeat the purpose since
> I am trying to avoid unnecessary rebalancing of consumers when there is
> high pressure on the consumers.
>
> Regarding handling of rebalancing for a queue solution you describe; It
> really sounds very complicated. It's probably doable, but doesn't this sort
> of defeat the purpose of the high level consumer API? I mean, it sounds
> like it should gracefully handle slow consumption of varying size. I might
> be wrong.
>
> Thanks,
> Jens
>
>
> --
> Jens Rantil
> Backend engineer
> Tink AB
>
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
>
> Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> <
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> >
>  Twitter <https://twitter.com/tink>
>

Reply via email to