Hi Jens, Gut feeling is that it's not a trivial patch, but you're more than welcome to take a shot. We should probably do a KIP also since it's a change to a public API (even if we only just released it). That's also a good way to get feedback and make sure we're not missing a better approach. Here's a link to the KIP guide: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals. Let me know if I can help out.
-Jason On Wed, Dec 16, 2015 at 2:07 PM, Jens Rantil <jens.ran...@tink.se> wrote: > Hi again Jason, > > Once again thanks for your answer. > > Yes, the more I think/read about this it sounds like the "max records" > approach is more viable. Without knowing the code, I guess it would make > more sense to create a "max.partition.fetch.messages" property. That way a > consumer could optimize for quick fetch on startup instead of per-poll() > call. And if a consumer really would like to change the number of messages > realtime, they could simply close the consumer and restart it. > > I spent 45 minutes trying to set up a development environment to have a > look at the Kafka code and maybe submit a pull request for this. Do you > think this would be hard to implement? Would introducing this need a larger > consensus/discussion in KAFKA-2986? > > Last, but not least, I'm happy to hear that this case is something that > Kafka should handle. I've reviewed many queueing solutions really seem like > the absolute best solution to our problem as long we can overcome this > issue. > > Thanks, > Jens > > On Tuesday, December 15, 2015, Jason Gustafson <ja...@confluent.io > <javascript:_e(%7B%7D,'cvml','ja...@confluent.io');>> wrote: > > > I was talking with Jay this afternoon about this use case. The tricky > thing > > about adding a ping() or heartbeat() API is that you have to deal with > the > > potential for rebalancing. This means either allowing it to block while a > > rebalance completes or having it raise an exception indicating that a > > rebalance is needed. In code, the latter might look like this: > > > > while (running) { > > ConsumerRecords<K, V> records = consumer.poll(1000); > > try { > > for (ConsumerRecord record : records) { > > process(record); > > consumer.heartbeat(); > > } > > } catch (RebalanceException e){ > > continue; > > } > > } > > > > Unfortunately, this wouldn't work with auto-commit since it would tend to > > break message processing early which would let the committed position get > > ahead of the last offset processed. The alternative blocking approach > > wouldn't be any better in this regard. Overall, it seems like this might > > introduce a bigger problem than it solves. > > > > Perhaps the simpler solution is to provide a way to set the maximum > number > > of messages returned. This could either be a new configuration option or > a > > second argument in poll, but it would let you handle messages one-by-one > if > > you needed to. You'd then be able to set the session timeout according to > > the expected time to handle a single message. It'd be a bit more work to > > implement this, but if the use case is common enough, it might be > > worthwhile. > > > > -Jason > > > > On Tue, Dec 15, 2015 at 10:31 AM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hey Jens, > > > > > > The purpose of pause() is to stop fetches for a set of partitions. This > > > lets you continue calling poll() to send heartbeats. Also note that > > poll() > > > generally only blocks for rebalances. In code, something like this is > > what > > > I was thinking: > > > > > > while (running) { > > > ConsumerRecords<K, V> records = consumer.poll(1000); > > > if (queue.offer(records)) > > > continue; > > > > > > TopicPartition[] assignment = toArray(consumer.assignment()); > > > consumer.pause(assignment); > > > while (!queue.offer(records, heartbeatIntervalMs, > > TimeUnit.MILLISECONDS)) > > > consumer.poll(0); > > > consumer.resume(assignment); > > > } > > > > > > The tricky thing is handling rebalances since they might occur in > either > > > call to poll(). In a rebalance, you have to 1) drain the queue, 2) > commit > > > current offsets, and 3) maybe break from the inner poll loop. If the > > > processing thread is busy when the rebalance is triggered, then you may > > > have to discard the results when it's finished. It's also a little > > > difficult communicating completion to the poll loop, which is where the > > > offset commit needs to take place. I suppose another queue would work, > > > sigh. > > > > > > Well, I think you can make that work, but I tend to agree that it's > > pretty > > > complicated. Perhaps instead of a queue, you should just submit the > > > processor to an executor service for each record set returned and await > > its > > > completion directly. For example: > > > > > > while (running) { > > > ConsumerRecords<K, V> records = consumer.poll(1000); > > > Future future = executor.submit(new Processor(records)); > > > > > > TopicPartition[] assignment = toArray(consumer.assignment()); > > > consumer.pause(assignment); > > > while (!complete(future, heartbeatIntervalMs, > TimeUnit.MILLLISECONDS)) > > > consumer.poll(0); > > > consumer.resume(assignment); > > > consumer.commitSync(); > > > } > > > > > > This seems closer to the spirit of the poll loop, and it makes handling > > > commits a lot easier. You still have to deal with the rebalance > problem, > > > but at least you don't have to deal with the queue. It's still a little > > > complex though. Maybe the consumer needs a ping() API which does the > same > > > thing as poll() but doesn't send or return any fetches. That would > > simplify > > > things a little more: > > > > > > while (running) { > > > ConsumerRecords<K, V> records = consumer.poll(1000); > > > Future future = executor.submit(new Processor(records)); > > > while (!complete(future, heartbeatIntervalMs, > TimeUnit.MILLLISECONDS)) > > > consumer.ping(); > > > consumer.commitSync(); > > > } > > > > > > Anyway, I'll think about it a little more and see if any other > approaches > > > come to mind. I do agree that we should have a way to handle this case > > > without too much extra work. > > > > > > > > > -Jason > > > > > > > > > On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil <jens.ran...@tink.se> > > wrote: > > > > > >> Hi Jason, > > >> > > >> Thanks for your response. See replies inline: > > >> > > >> On Tuesday, December 15, 2015, Jason Gustafson <ja...@confluent.io> > > >> wrote: > > >> > > >> > Hey Jens, > > >> > > > >> > I'm not sure I understand why increasing the session timeout is not > an > > >> > option. Is the issue that there's too much uncertainly about > > processing > > >> > time to set an upper bound for each round of the poll loop? > > >> > > > >> > > >> Yes, that's the issue. > > >> > > >> > > >> > One general workaround would be to move the processing into another > > >> thread. > > >> > For example, you could add messages to a blocking queue and have the > > >> > processor poll them. We have a pause() API which can be used to > > prevent > > >> > additional fetches when the queue is full. This would allow you to > > >> continue > > >> > the poll loop instead of blocking in offer(). The tricky thing would > > be > > >> > handling rebalances since you'd need to clear the queue and get the > > last > > >> > offset from the processors. Enough people probably will end up doing > > >> > something like this that we should probably work through an example > on > > >> how > > >> > to do it properly. > > >> > > > >> > > >> Hm, as far as I've understood the consumer will only send heartbeats > to > > >> the > > >> broker when poll() is being called. If I would call pause() on a > > consumer > > >> (from a separate thread) I understand poll() will block undefinitely. > > Will > > >> the polling consumer still send heartbeats when blocked? Or would a > > pause > > >> for too long (while my records are being processed) eventually lead to > > >> session timeout? If the latter, that would sort of defeat the purpose > > >> since > > >> I am trying to avoid unnecessary rebalancing of consumers when there > is > > >> high pressure on the consumers. > > >> > > >> Regarding handling of rebalancing for a queue solution you describe; > It > > >> really sounds very complicated. It's probably doable, but doesn't this > > >> sort > > >> of defeat the purpose of the high level consumer API? I mean, it > sounds > > >> like it should gracefully handle slow consumption of varying size. I > > might > > >> be wrong. > > >> > > >> Thanks, > > >> Jens > > >> > > >> > > >> -- > > >> Jens Rantil > > >> Backend engineer > > >> Tink AB > > >> > > >> Email: jens.ran...@tink.se > > >> Phone: +46 708 84 18 32 > > >> Web: www.tink.se > > >> > > >> Facebook <https://www.facebook.com/#!/tink.se> Linkedin > > >> < > > >> > > > http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary > > >> > > > >> Twitter <https://twitter.com/tink> > > >> > > > > > > > > > > > -- > Jens Rantil > Backend engineer > Tink AB > > Email: jens.ran...@tink.se > Phone: +46 708 84 18 32 > Web: www.tink.se > > Facebook <https://www.facebook.com/#!/tink.se> Linkedin > < > http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary > > > Twitter <https://twitter.com/tink> >