C0urante commented on a change in pull request #10563:
URL: https://github.com/apache/kafka/pull/10563#discussion_r671404000



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -412,32 +425,36 @@ private void commitOffsets(long now, boolean closing) {
             return;
         }
 
-        final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new 
HashMap<>(lastCommittedOffsets);
+        Collection<TopicPartition> allAssignedTopicPartitions = 
consumer.assignment();
+        final Map<TopicPartition, OffsetAndMetadata> committableOffsets = new 
HashMap<>(lastCommittedOffsets);
         for (Map.Entry<TopicPartition, OffsetAndMetadata> 
taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
             final TopicPartition partition = taskProvidedOffsetEntry.getKey();
             final OffsetAndMetadata taskProvidedOffset = 
taskProvidedOffsetEntry.getValue();
-            if (commitableOffsets.containsKey(partition)) {
+            if (committableOffsets.containsKey(partition)) {
                 long taskOffset = taskProvidedOffset.offset();
-                long currentOffset = currentOffsets.get(partition).offset();
+                long currentOffset = offsetsToCommit.get(partition).offset();
                 if (taskOffset <= currentOffset) {
-                    commitableOffsets.put(partition, taskProvidedOffset);
+                    committableOffsets.put(partition, taskProvidedOffset);
                 } else {
                     log.warn("{} Ignoring invalid task provided offset {}/{} 
-- not yet consumed, taskOffset={} currentOffset={}",
-                            this, partition, taskProvidedOffset, taskOffset, 
currentOffset);
+                        this, partition, taskProvidedOffset, taskOffset, 
currentOffset);
                 }
-            } else {
+            } else if (!allAssignedTopicPartitions.contains(partition)) {
                 log.warn("{} Ignoring invalid task provided offset {}/{} -- 
partition not assigned, assignment={}",
-                        this, partition, taskProvidedOffset, 
consumer.assignment());
+                        this, partition, taskProvidedOffset, 
allAssignedTopicPartitions);
+            } else {
+                log.debug("{} Ignoring task provided offset {}/{} -- topic 
partition not requested, requested={}",

Review comment:
       Good catch, `s/topic partition/partition/`
   
   "Requested" here means that although the partition is assigned to the task, 
it is not one of the partitions that we are currently committing offsets for.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to