I found that this was already reported here https://issues.apache.org/jira/browse/KAFKA-5731 and fixed!
-- Shrijeet On Thu, Sep 7, 2017 at 6:04 PM, Shrijeet Paliwal <shrijeet.pali...@gmail.com > wrote: > > *Kafka Version: 0.10.2.1* > > Hi, > > I am running a custom connector (in distributed mode) and noticed one of > the partition has its lag increasing consistently although it's assigned to > a connect worker. Log messages in the connect log follow: > > [DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask > onPartitionsAssigned - foo.sink-16 assigned topic partition bar-1 with > offset 1871179 > [DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask commitOffsets - > WorkerSinkTask{id=foo.sink-16} Skipping offset commit, no change since last > commit > [DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onCommitCompleted - > Finished WorkerSinkTask{id=foo.sink-16} offset commit successfully in 0 ms > [DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onCommitCompleted - > Got callback for timed out commit WorkerSinkTask{id=foo.sink-16}: 2, but > most recent commit is 4 > [WARN] 2017-09-07 14:32:54,573 runtime.WorkerSinkTask commitOffsets - > Ignoring invalid task provided offset bar-1/OffsetAndMetadata{offset=1871179, > metadata=''} -- partition not assigned > > > Looking at code it seems a callback from previous async offset commit > could reset the lastCommittedOffsets to an incorrect state causing all > subsequent commits to be ignored (see logs above). Relevant code snippets > are: > > private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, > boolean closing, final int seqno) { > log.info("{} Committing offsets", this); > if (closing) { > doCommitSync(offsets, seqno); > } else { > OffsetCommitCallback cb = new OffsetCommitCallback() { > @Override > public void onComplete(Map<TopicPartition, > OffsetAndMetadata> offsets, Exception error) { > if (error == null) { > * lastCommittedOffsets = offsets;* > } > onCommitCompleted(error, seqno); > } > }; > consumer.commitAsync(offsets, cb); > } > } > > > *---* > > > private void commitOffsets(long now, boolean closing) { > <snip> > ..... > final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new > HashMap<>(lastCommittedOffsets); > * if (commitableOffsets.containsKey(partition)) {* > if (taskProvidedOffset.offset() <= > currentOffsets.get(partition).offset()) { > commitableOffsets.put(partition, taskProvidedOffset); > } else { > log.warn("Ignoring invalid task provided offset {}/{} > -- not yet consumed", partition, taskProvidedOffset); > } > } else { > *log.warn("Ignoring invalid task provided offset {}/{} -- > partition not assigned", partition, taskProvidedOffset);* > } > <snip> > ... > } > > > > *-----* > > Is my understanding correct that this looks like a bug? > > > -- > Shrijeet >