C0urante commented on a change in pull request #10563:
URL: https://github.com/apache/kafka/pull/10563#discussion_r671422842



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -710,22 +758,35 @@ else if (!context.pausedPartitions().isEmpty())
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
+            onPartitionsRemoved(partitions, false);
+        }
+
+        @Override
+        public void onPartitionsLost(Collection<TopicPartition> partitions) {
+            onPartitionsRemoved(partitions, true);
+        }
+
+        private void onPartitionsRemoved(Collection<TopicPartition> 
partitions, boolean lost) {
             if (taskStopped) {
                 log.trace("Skipping partition revocation callback as task has 
already been stopped");
                 return;
             }
-            log.debug("{} Partitions revoked", WorkerSinkTask.this);
+            log.debug("{} Partitions {}: {}", WorkerSinkTask.this, lost ? 
"lost" : "revoked", partitions);
+
+            if (partitions.isEmpty())
+                return;
+
             try {
-                closePartitions();
-                sinkTaskMetricsGroup.clearOffsets();
+                closePartitions(partitions, lost);
+                sinkTaskMetricsGroup.clearOffsets(partitions);
             } catch (RuntimeException e) {
                 // The consumer swallows exceptions raised in the rebalance 
listener, so we need to store
                 // exceptions and rethrow when poll() returns.
                 rebalanceException = e;
             }
 
-            // Make sure we don't have any leftover data since offsets will be 
reset to committed positions
-            messageBatch.clear();
+            // Make sure we don't have any leftover data since offsets for 
these partitions will be reset to committed positions
+            messageBatch.removeIf(record -> partitions.contains(new 
TopicPartition(record.topic(), record.kafkaPartition())));

Review comment:
       It certainly will be more expensive, although this will only occur in an 
edge case where the task has thrown a `RetriableException` from 
`SinkTask::put`, then before a follow-up invocation of `SinkTask::put` is able 
to succeed, a consumer rebalance takes place and partitions are revoked.
   
   A naive improvement might be to convert `messageBatch` to a 
`Map<TopicPartition, List<SinkRecord>>`, which would allow us to quickly filter 
out records belonging to a given topic partition. But that would also be less 
efficient since we'd have to re-join those lists together before delivering 
records to the task, and would have to populate that map in the first place 
after retrieving the original list of records from `Consumer::poll`.
   
   Not sure if there's a good way around this; if you believe it's a blocker 
I'm happy to spend some more time on it, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to