bbejeck commented on code in PR #19421: URL: https://github.com/apache/kafka/pull/19421#discussion_r2038207878
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -180,6 +186,25 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { */ private class BackgroundEventProcessor implements EventProcessor<BackgroundEvent> { + private Optional<StreamsRebalanceListener> streamsGroupRebalanceCallbacks = Optional.empty(); Review Comment: nit: the field is named `streamsGroupRebalanceCallbacks` but the type wrapped by the `Optional` is `StreamsRebalanceListener` maybe rename to `streamsRebalanceListener`? Up to you ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -213,6 +250,79 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { throw invokedEvent.error().get(); } } + + private void processStreamsOnTasksRevokedCallbackNeededEvent(final StreamsOnTasksRevokedCallbackNeededEvent event) { + StreamsOnTasksRevokedCallbackCompletedEvent invokedEvent = invokeOnTasksRevokedCallback(event.activeTasksToRevoke(), event.future()); + applicationEventHandler.add(invokedEvent); + if (invokedEvent.error().isPresent()) { + throw invokedEvent.error().get(); + } + } + + private void processStreamsOnTasksAssignedCallbackNeededEvent(final StreamsOnTasksAssignedCallbackNeededEvent event) { + StreamsOnTasksAssignedCallbackCompletedEvent invokedEvent = invokeOnTasksAssignedCallback(event.assignment(), event.future()); + applicationEventHandler.add(invokedEvent); + if (invokedEvent.error().isPresent()) { + throw invokedEvent.error().get(); + } + } + + private void processStreamsOnAllTasksLostCallbackNeededEvent(final StreamsOnAllTasksLostCallbackNeededEvent event) { + StreamsOnAllTasksLostCallbackCompletedEvent invokedEvent = invokeOnAllTasksLostCallback(event.future()); + applicationEventHandler.add(invokedEvent); + if (invokedEvent.error().isPresent()) { + throw invokedEvent.error().get(); + } + } + + private StreamsOnTasksRevokedCallbackCompletedEvent invokeOnTasksRevokedCallback(final Set<StreamsRebalanceData.TaskId> activeTasksToRevoke, + final CompletableFuture<Void> future) { + final Optional<KafkaException> error; + final Optional<Exception> exceptionFromCallback = streamsGroupRebalanceCallbacks().onTasksRevoked(activeTasksToRevoke); + if (exceptionFromCallback.isPresent()) { + error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "Task revocation callback throws an error")); + } else { + error = Optional.empty(); + } Review Comment: ```suggestion error = exceptionFromCallback.map(e -> ConsumerUtils.maybeWrapAsKafkaException(e, "Task revocation callback throws an error")); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org