Apologies if this has been discussed already... The ‘Manual Offset Control’ section of the KafkaConsumer javadocs has some code that looks like this:
ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); if (buffer.size() >= commitInterval) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } } From my reading of the docs on commitSync(), this seems like a fairly unsafe thing to do. In particular, the consumer is calling commitSync() in the middle of processing a bunch of records. If it dies immediately after that commit, it will pick up processing on restart *after* the remaining unprocessed records. It would seem that this example should be using commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets) which, of course, would require a bit more bookkeeping of what was the last processed offset for each partition of each topic in the current set of records. Or, perhaps this example should be reworked to not be committing offsets until after it has processed all the records? --- Richard Lee Principal Engineer Office of the CTO TiVo Inc.
smime.p7s
Description: S/MIME cryptographic signature