*Kafka Version: 0.10.2.1* Hi,
I am running a custom connector (in distributed mode) and noticed one of the partition has its lag increasing consistently although it's assigned to a connect worker. Log messages in the connect log follow: [DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onPartitionsAssigned - foo.sink-16 assigned topic partition bar-1 with offset 1871179 [DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask commitOffsets - WorkerSinkTask{id=foo.sink-16} Skipping offset commit, no change since last commit [DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onCommitCompleted - Finished WorkerSinkTask{id=foo.sink-16} offset commit successfully in 0 ms [DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onCommitCompleted - Got callback for timed out commit WorkerSinkTask{id=foo.sink-16}: 2, but most recent commit is 4 [WARN] 2017-09-07 14:32:54,573 runtime.WorkerSinkTask commitOffsets - Ignoring invalid task provided offset bar-1/OffsetAndMetadata{offset=1871179, metadata=''} -- partition not assigned Looking at code it seems a callback from previous async offset commit could reset the lastCommittedOffsets to an incorrect state causing all subsequent commits to be ignored (see logs above). Relevant code snippets are: private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing, final int seqno) { log.info("{} Committing offsets", this); if (closing) { doCommitSync(offsets, seqno); } else { OffsetCommitCallback cb = new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) { if (error == null) { * lastCommittedOffsets = offsets;* } onCommitCompleted(error, seqno); } }; consumer.commitAsync(offsets, cb); } } *---* private void commitOffsets(long now, boolean closing) { <snip> ..... final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new HashMap<>(lastCommittedOffsets); * if (commitableOffsets.containsKey(partition)) {* if (taskProvidedOffset.offset() <= currentOffsets.get(partition).offset()) { commitableOffsets.put(partition, taskProvidedOffset); } else { log.warn("Ignoring invalid task provided offset {}/{} -- not yet consumed", partition, taskProvidedOffset); } } else { *log.warn("Ignoring invalid task provided offset {}/{} -- partition not assigned", partition, taskProvidedOffset);* } <snip> ... } *-----* Is my understanding correct that this looks like a bug? -- Shrijeet