Hey Richard, Yeah, I think you're right. I think this is the same issue from KAFKA-2478, which appears to have been forgotten about. I'll see if we can get the patch merged.
-Jason On Mon, Jan 11, 2016 at 4:27 PM, Richard Lee <rd...@tivo.com> wrote: > Apologies if this has been discussed already... > > The ‘Manual Offset Control’ section of the KafkaConsumer javadocs has some > code that looks like this: > > ConsumerRecords<String, String> records = consumer.poll(100); > for (ConsumerRecord<String, String> record : records) { > buffer.add(record); > if (buffer.size() >= commitInterval) { > insertIntoDb(buffer); > consumer.commitSync(); > buffer.clear(); > } > } > > From my reading of the docs on commitSync(), this seems like a fairly > unsafe thing to do. In particular, the consumer is calling commitSync() in > the middle of processing a bunch of records. If it dies immediately after > that commit, it will pick up processing on restart *after* the remaining > unprocessed records. > > It would seem that this example should be using > > commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> > offsets) > > which, of course, would require a bit more bookkeeping of what was the > last processed offset for each partition of each topic in the current set > of records. > > Or, perhaps this example should be reworked to not be committing offsets > until after it has processed all the records? > > --- > Richard Lee > Principal Engineer > Office of the CTO > TiVo Inc. > >