Hey Richard,

Yeah, I think you're right. I think this is the same issue from KAFKA-2478,
which appears to have been forgotten about. I'll see if we can get the
patch merged.

-Jason

On Mon, Jan 11, 2016 at 4:27 PM, Richard Lee <rd...@tivo.com> wrote:

> Apologies if this has been discussed already...
>
> The ‘Manual Offset Control’ section of the KafkaConsumer javadocs has some
> code that looks like this:
>
>          ConsumerRecords<String, String> records = consumer.poll(100);
>          for (ConsumerRecord<String, String> record : records) {
>              buffer.add(record);
>              if (buffer.size() >= commitInterval) {
>                  insertIntoDb(buffer);
>                  consumer.commitSync();
>                  buffer.clear();
>              }
>          }
>
> From my reading of the docs on commitSync(), this seems like a fairly
> unsafe thing to do.  In particular, the consumer is calling commitSync() in
> the middle of processing a bunch of records.  If it dies immediately after
> that commit, it will pick up processing on restart *after* the remaining
> unprocessed records.
>
> It would seem that this example should be using
>
>          commitSync(java.util.Map<TopicPartition,OffsetAndMetadata>
> offsets)
>
> which, of course, would require a bit more bookkeeping of what was the
> last processed offset for each partition of each topic in the current set
> of records.
>
> Or, perhaps this example should be reworked to not be committing offsets
> until after it has processed all the records?
>
> ---
> Richard Lee
> Principal Engineer
> Office of the CTO
> TiVo Inc.
>
>

Reply via email to