C0urante commented on a change in pull request #10563: URL: https://github.com/apache/kafka/pull/10563#discussion_r671404000
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ########## @@ -412,32 +425,36 @@ private void commitOffsets(long now, boolean closing) { return; } - final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new HashMap<>(lastCommittedOffsets); + Collection<TopicPartition> allAssignedTopicPartitions = consumer.assignment(); + final Map<TopicPartition, OffsetAndMetadata> committableOffsets = new HashMap<>(lastCommittedOffsets); for (Map.Entry<TopicPartition, OffsetAndMetadata> taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) { final TopicPartition partition = taskProvidedOffsetEntry.getKey(); final OffsetAndMetadata taskProvidedOffset = taskProvidedOffsetEntry.getValue(); - if (commitableOffsets.containsKey(partition)) { + if (committableOffsets.containsKey(partition)) { long taskOffset = taskProvidedOffset.offset(); - long currentOffset = currentOffsets.get(partition).offset(); + long currentOffset = offsetsToCommit.get(partition).offset(); if (taskOffset <= currentOffset) { - commitableOffsets.put(partition, taskProvidedOffset); + committableOffsets.put(partition, taskProvidedOffset); } else { log.warn("{} Ignoring invalid task provided offset {}/{} -- not yet consumed, taskOffset={} currentOffset={}", - this, partition, taskProvidedOffset, taskOffset, currentOffset); + this, partition, taskProvidedOffset, taskOffset, currentOffset); } - } else { + } else if (!allAssignedTopicPartitions.contains(partition)) { log.warn("{} Ignoring invalid task provided offset {}/{} -- partition not assigned, assignment={}", - this, partition, taskProvidedOffset, consumer.assignment()); + this, partition, taskProvidedOffset, allAssignedTopicPartitions); + } else { + log.debug("{} Ignoring task provided offset {}/{} -- topic partition not requested, requested={}", Review comment: Good catch, `s/topic partition/partition/` "Requested" here means that although the partition is assigned to the task, it is not one of the partitions that we are currently committing offsets for. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org