I found that this was already reported here
https://issues.apache.org/jira/browse/KAFKA-5731 and fixed!

--
Shrijeet

On Thu, Sep 7, 2017 at 6:04 PM, Shrijeet Paliwal <shrijeet.pali...@gmail.com
> wrote:

>
> *Kafka Version: 0.10.2.1*
>
> Hi,
>
> I am running a custom connector (in distributed mode) and noticed one of
> the partition has its lag increasing consistently although it's assigned to
> a connect worker. Log messages in the connect log follow:
>
> [DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask
> onPartitionsAssigned - foo.sink-16 assigned topic partition bar-1 with
> offset 1871179
> [DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask commitOffsets -
> WorkerSinkTask{id=foo.sink-16} Skipping offset commit, no change since last
> commit
> [DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onCommitCompleted -
> Finished WorkerSinkTask{id=foo.sink-16} offset commit successfully in 0 ms
> [DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onCommitCompleted -
> Got callback for timed out commit WorkerSinkTask{id=foo.sink-16}: 2, but
> most recent commit is 4
> [WARN] 2017-09-07 14:32:54,573 runtime.WorkerSinkTask commitOffsets -
> Ignoring invalid task provided offset bar-1/OffsetAndMetadata{offset=1871179,
> metadata=''} -- partition not assigned
>
>
> Looking at code it seems a callback from previous async offset commit
> could reset the lastCommittedOffsets to an incorrect state causing all
> subsequent commits to be ignored (see logs above). Relevant code snippets
> are:
>
> private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets,
> boolean closing, final int seqno) {
>        log.info("{} Committing offsets", this);
>        if (closing) {
>            doCommitSync(offsets, seqno);
>        } else {
>            OffsetCommitCallback cb = new OffsetCommitCallback() {
>                @Override
>                public void onComplete(Map<TopicPartition,
> OffsetAndMetadata> offsets, Exception error) {
>                    if (error == null) {
>                      *  lastCommittedOffsets = offsets;*
>                    }
>                    onCommitCompleted(error, seqno);
>                }
>            };
>            consumer.commitAsync(offsets, cb);
>        }
>    }
>
>
> *---*
>
>
> private void commitOffsets(long now, boolean closing) {
>    <snip>
>     .....
>     final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new
> HashMap<>(lastCommittedOffsets);
> *    if (commitableOffsets.containsKey(partition)) {*
>             if (taskProvidedOffset.offset() <=
> currentOffsets.get(partition).offset()) {
>                     commitableOffsets.put(partition, taskProvidedOffset);
>                 } else {
>                     log.warn("Ignoring invalid task provided offset {}/{}
> -- not yet consumed", partition, taskProvidedOffset);
>                 }
>             } else {
>                 *log.warn("Ignoring invalid task provided offset {}/{} --
> partition not assigned", partition, taskProvidedOffset);*
>             }
>     <snip>
>     ...
>     }
>
>
>
> *-----*
>
> Is my understanding correct that this looks like a bug?
>
>
> --
> Shrijeet
>

Reply via email to