*Kafka Version: 0.10.2.1*

Hi,

I am running a custom connector (in distributed mode) and noticed one of
the partition has its lag increasing consistently although it's assigned to
a connect worker. Log messages in the connect log follow:

[DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onPartitionsAssigned
- foo.sink-16 assigned topic partition bar-1 with offset 1871179
[DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask commitOffsets -
WorkerSinkTask{id=foo.sink-16} Skipping offset commit, no change since last
commit
[DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onCommitCompleted -
Finished WorkerSinkTask{id=foo.sink-16} offset commit successfully in 0 ms
[DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onCommitCompleted -
Got callback for timed out commit WorkerSinkTask{id=foo.sink-16}: 2, but
most recent commit is 4
[WARN] 2017-09-07 14:32:54,573 runtime.WorkerSinkTask commitOffsets -
Ignoring invalid task provided offset
bar-1/OffsetAndMetadata{offset=1871179, metadata=''} -- partition not
assigned


Looking at code it seems a callback from previous async offset commit could
reset the lastCommittedOffsets to an incorrect state causing all subsequent
commits to be ignored (see logs above). Relevant code snippets are:

private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets,
boolean closing, final int seqno) {
       log.info("{} Committing offsets", this);
       if (closing) {
           doCommitSync(offsets, seqno);
       } else {
           OffsetCommitCallback cb = new OffsetCommitCallback() {
               @Override
               public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception error) {
                   if (error == null) {
                     *  lastCommittedOffsets = offsets;*
                   }
                   onCommitCompleted(error, seqno);
               }
           };
           consumer.commitAsync(offsets, cb);
       }
   }


*---*


private void commitOffsets(long now, boolean closing) {
   <snip>
    .....
    final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new
HashMap<>(lastCommittedOffsets);
*    if (commitableOffsets.containsKey(partition)) {*
            if (taskProvidedOffset.offset() <=
currentOffsets.get(partition).offset()) {
                    commitableOffsets.put(partition, taskProvidedOffset);
                } else {
                    log.warn("Ignoring invalid task provided offset {}/{}
-- not yet consumed", partition, taskProvidedOffset);
                }
            } else {
                *log.warn("Ignoring invalid task provided offset {}/{} --
partition not assigned", partition, taskProvidedOffset);*
            }
    <snip>
    ...
    }



*-----*

Is my understanding correct that this looks like a bug?


--
Shrijeet

Reply via email to