C0urante commented on a change in pull request #10563: URL: https://github.com/apache/kafka/pull/10563#discussion_r671422842
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ########## @@ -710,22 +758,35 @@ else if (!context.pausedPartitions().isEmpty()) @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + onPartitionsRemoved(partitions, false); + } + + @Override + public void onPartitionsLost(Collection<TopicPartition> partitions) { + onPartitionsRemoved(partitions, true); + } + + private void onPartitionsRemoved(Collection<TopicPartition> partitions, boolean lost) { if (taskStopped) { log.trace("Skipping partition revocation callback as task has already been stopped"); return; } - log.debug("{} Partitions revoked", WorkerSinkTask.this); + log.debug("{} Partitions {}: {}", WorkerSinkTask.this, lost ? "lost" : "revoked", partitions); + + if (partitions.isEmpty()) + return; + try { - closePartitions(); - sinkTaskMetricsGroup.clearOffsets(); + closePartitions(partitions, lost); + sinkTaskMetricsGroup.clearOffsets(partitions); } catch (RuntimeException e) { // The consumer swallows exceptions raised in the rebalance listener, so we need to store // exceptions and rethrow when poll() returns. rebalanceException = e; } - // Make sure we don't have any leftover data since offsets will be reset to committed positions - messageBatch.clear(); + // Make sure we don't have any leftover data since offsets for these partitions will be reset to committed positions + messageBatch.removeIf(record -> partitions.contains(new TopicPartition(record.topic(), record.kafkaPartition()))); Review comment: It certainly will be more expensive, although this will only occur in an edge case where the task has thrown a `RetriableException` from `SinkTask::put`, then before a follow-up invocation of `SinkTask::put` is able to succeed, a consumer rebalance takes place and partitions are revoked. A naive improvement might be to convert `messageBatch` to a `Map<TopicPartition, List<SinkRecord>>`, which would allow us to quickly filter out records belonging to a given topic partition. But that would also be less efficient since we'd have to re-join those lists together before delivering records to the task, and would have to populate that map in the first place after retrieving the original list of records from `Consumer::poll`. Not sure if there's a good way around this; if you believe it's a blocker I'm happy to spend some more time on it, though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org