Apologies if this has been discussed already...

The ‘Manual Offset Control’ section of the KafkaConsumer javadocs has some code 
that looks like this:

         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
             if (buffer.size() >= commitInterval) {
                 insertIntoDb(buffer);
                 consumer.commitSync();
                 buffer.clear();
             }
         }

From my reading of the docs on commitSync(), this seems like a fairly unsafe 
thing to do.  In particular, the consumer is calling commitSync() in the middle 
of processing a bunch of records.  If it dies immediately after that commit, it 
will pick up processing on restart *after* the remaining unprocessed records.

It would seem that this example should be using 

         commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets)

which, of course, would require a bit more bookkeeping of what was the last 
processed offset for each partition of each topic in the current set of records.

Or, perhaps this example should be reworked to not be committing offsets until 
after it has processed all the records?

---
Richard Lee
Principal Engineer
Office of the CTO
TiVo Inc.

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to