Hi again Jason,

Sorry for a bit of a late response - I'm travelling and check my e-mail
spuriously.

I have a specific question regarding they pause solution quoted below:

On Tuesday, December 15, 2015, Jason Gustafson <ja...@confluent.io> wrote:
>
> while (running) {
>   ConsumerRecords<K, V> records = consumer.poll(1000);
>   if (queue.offer(records))
>     continue;
>
>   TopicPartition[] assignment = toArray(consumer.assignment());
>   consumer.pause(assignment);
>   while (!queue.offer(records, heartbeatIntervalMs, TimeUnit.MILLISECONDS))
>     consumer.poll(0);
>   consumer.resume(assignment);
> }
>

As far as I've understood, the `KafkaConsumer` has a background thread that
fetches records, right? If so, isn't there a race condition between the
`consumer.poll(1000);` call and `consumer.pause(assignment);` where the
fetcher might fetch, and commit, messages that I then collect on my first
`consumer.poll(0);` call? Since `consumer.poll(0);` then would return a
non-empty list, I would essentially ignoring messages? Or is the pause()
call both 1) making sure consumer#poll never returns anything _and_ 2)
pauses the background fetcher?

Cheers,
Jens


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>

Reply via email to