bbejeck commented on code in PR #17795:
URL: https://github.com/apache/kafka/pull/17795#discussion_r1856742919


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java:
##########
@@ -281,17 +300,173 @@ public String toString() {
         }
     }
 
+    private final BlockingQueue<BackgroundEvent> onCallbackRequests = new 
LinkedBlockingQueue<>();
+
+    private ApplicationEventHandler applicationEventHandler = null;
+
+    private Optional<Function<Set<StreamsAssignmentInterface.TaskId>, 
Optional<Exception>>> onTasksRevokedCallback = null;
+    private Optional<Function<Assignment, Optional<Exception>>> 
onTasksAssignedCallback = null;
+    private Optional<Supplier<Optional<Exception>>> onAllTasksLostCallback = 
null;
+
+    private final StreamsRebalanceEventProcessor 
streamsRebalanceEventProcessor;
+
+    private class StreamsRebalanceEventProcessor implements 
EventProcessor<BackgroundEvent> {
+
+        @Override
+        public void process(final BackgroundEvent event) {
+            switch (event.type()) {
+                case ERROR:
+                    process((ErrorEvent) event);

Review Comment:
   Maybe just have `throw event.error();` here instead of the one-line method.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java:
##########
@@ -781,43 +889,6 @@ private void 
setupStreamsAssignmentInterfaceWithTwoSubtopologies(final String su
         );
     }
 
-    private void 
setupStreamsAssignmentInterfaceWithTwoConcatenedSubtopologies(final String 
subtopologyId1,
-                                                                               
final String topicName1,
-                                                                               
final String subtopologyId2,
-                                                                               
final String topicName2) {
-        when(streamsAssignmentInterface.subtopologyMap()).thenReturn(
-            mkMap(
-                mkEntry(
-                    subtopologyId1,
-                    new StreamsAssignmentInterface.Subtopology(
-                        Set.of(topicName1),
-                        Collections.emptySet(),
-                        Collections.emptyMap(),
-                        Collections.emptyMap(),
-                        Collections.emptyList()
-                    )
-                ),
-                mkEntry(
-                    subtopologyId2,
-                    new StreamsAssignmentInterface.Subtopology(
-                        Set.of(topicName2),
-                        Collections.emptySet(),
-                        mkMap(mkEntry(
-                            topicName2,
-                            new StreamsAssignmentInterface.TopicInfo(
-                                Optional.empty(),
-                                Optional.empty(),
-                                Collections.emptyMap()
-                            )
-                        )),
-                        Collections.emptyMap(),
-                        Collections.emptyList()
-                    )
-                )
-            )
-        );
-    }
-

Review Comment:
   We removed an assignment with 2 subtopologies - will we add it back at some 
point or is present in another form?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -422,6 +439,33 @@ private void process(final 
ShareAcknowledgementCommitCallbackRegistrationEvent e
         
manager.setAcknowledgementCommitCallbackRegistered(event.isCallbackRegistered());
     }
 
+    private void process(final StreamsOnTasksRevokedCallbackCompletedEvent 
event) {
+        if (!requestManagers.streamsMembershipManager.isPresent()) {

Review Comment:
   Maybe use `requestManagers.streamsMembershipManager.isEmpty()` instead here 
and below.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java:
##########
@@ -281,17 +300,173 @@ public String toString() {
         }
     }
 
+    private final BlockingQueue<BackgroundEvent> onCallbackRequests = new 
LinkedBlockingQueue<>();
+
+    private ApplicationEventHandler applicationEventHandler = null;
+
+    private Optional<Function<Set<StreamsAssignmentInterface.TaskId>, 
Optional<Exception>>> onTasksRevokedCallback = null;
+    private Optional<Function<Assignment, Optional<Exception>>> 
onTasksAssignedCallback = null;
+    private Optional<Supplier<Optional<Exception>>> onAllTasksLostCallback = 
null;
+
+    private final StreamsRebalanceEventProcessor 
streamsRebalanceEventProcessor;
+
+    private class StreamsRebalanceEventProcessor implements 
EventProcessor<BackgroundEvent> {
+
+        @Override
+        public void process(final BackgroundEvent event) {
+            switch (event.type()) {
+                case ERROR:
+                    process((ErrorEvent) event);
+                    break;
+
+                case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED:
+                    process((StreamsOnTasksRevokedCallbackNeededEvent) event);
+                    break;
+
+                case STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED:
+                    process((StreamsOnTasksAssignedCallbackNeededEvent) event);
+                    break;
+
+                case STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED:
+                    process((StreamsOnAllTasksLostCallbackNeededEvent) event);
+                    break;
+
+                default:
+                    throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+
+            }
+        }
+
+        private void process(final ErrorEvent event) {
+            throw event.error();
+        }
+
+        private void process(final StreamsOnTasksRevokedCallbackNeededEvent 
event) {

Review Comment:
   suggestion: refactor the method name to `processOnTasksRevokedCallbackNeeded`
   Same as below for the other two methods - but this is a personal preference, 
so feel free to leave as is.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java:
##########
@@ -281,17 +300,173 @@ public String toString() {
         }
     }
 
+    private final BlockingQueue<BackgroundEvent> onCallbackRequests = new 
LinkedBlockingQueue<>();
+
+    private ApplicationEventHandler applicationEventHandler = null;
+
+    private Optional<Function<Set<StreamsAssignmentInterface.TaskId>, 
Optional<Exception>>> onTasksRevokedCallback = null;
+    private Optional<Function<Assignment, Optional<Exception>>> 
onTasksAssignedCallback = null;
+    private Optional<Supplier<Optional<Exception>>> onAllTasksLostCallback = 
null;
+
+    private final StreamsRebalanceEventProcessor 
streamsRebalanceEventProcessor;
+
+    private class StreamsRebalanceEventProcessor implements 
EventProcessor<BackgroundEvent> {
+
+        @Override
+        public void process(final BackgroundEvent event) {
+            switch (event.type()) {
+                case ERROR:
+                    process((ErrorEvent) event);
+                    break;
+
+                case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED:
+                    process((StreamsOnTasksRevokedCallbackNeededEvent) event);
+                    break;
+
+                case STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED:
+                    process((StreamsOnTasksAssignedCallbackNeededEvent) event);
+                    break;
+
+                case STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED:
+                    process((StreamsOnAllTasksLostCallbackNeededEvent) event);
+                    break;
+
+                default:
+                    throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+
+            }
+        }
+
+        private void process(final ErrorEvent event) {
+            throw event.error();
+        }
+
+        private void process(final StreamsOnTasksRevokedCallbackNeededEvent 
event) {
+            StreamsOnTasksRevokedCallbackCompletedEvent invokedEvent = 
invokeOnTasksRevokedCallback(event.activeTasksToRevoke(), event.future());
+            applicationEventHandler.add(invokedEvent);
+            if (invokedEvent.error().isPresent()) {
+                throw invokedEvent.error().get();
+            }
+        }
+
+        private void process(final StreamsOnTasksAssignedCallbackNeededEvent 
event) {
+            StreamsOnTasksAssignedCallbackCompletedEvent invokedEvent = 
invokeOnTasksAssignedCallback(event.assignment(), event.future());
+            applicationEventHandler.add(invokedEvent);
+            if (invokedEvent.error().isPresent()) {
+                throw invokedEvent.error().get();
+            }
+        }
+
+        private void process(final StreamsOnAllTasksLostCallbackNeededEvent 
event) {
+            StreamsOnAllTasksLostCallbackCompletedEvent invokedEvent = 
invokeOnAllTasksLostCallback(event.future());
+            applicationEventHandler.add(invokedEvent);
+            if (invokedEvent.error().isPresent()) {
+                throw invokedEvent.error().get();
+            }
+        }
+
+        private StreamsOnTasksRevokedCallbackCompletedEvent 
invokeOnTasksRevokedCallback(final Set<StreamsAssignmentInterface.TaskId> 
activeTasksToRevoke,
+                                                                               
           final CompletableFuture<Void> future) {
+            final Optional<Exception> exceptionFromCallback = 
onTasksRevokedCallback
+                .orElseThrow(() -> new IllegalStateException("No tasks 
assignment callback set!")).apply(activeTasksToRevoke);
+
+            return exceptionFromCallback
+                .map(exception ->
+                    new StreamsOnTasksRevokedCallbackCompletedEvent(
+                        future,
+                        
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exception, "Task revocation 
callback throws an error"))
+                    ))
+                .orElseGet(() -> new 
StreamsOnTasksRevokedCallbackCompletedEvent(future, Optional.empty()));
+        }
+
+        private StreamsOnTasksAssignedCallbackCompletedEvent 
invokeOnTasksAssignedCallback(final StreamsAssignmentInterface.Assignment 
assignment,
+                                                                               
            final CompletableFuture<Void> future) {
+            Optional<KafkaException> error = Optional.empty();
+            // ToDo: Can we avoid the following check?
+            if (!assignment.equals(reconciledAssignment.get())) {
+
+                final Optional<Exception> exceptionFromCallback = 
onTasksAssignedCallback
+                    .orElseThrow(() -> new IllegalStateException("No tasks 
assignment callback set!")).apply(assignment);
+
+                if (exceptionFromCallback.isPresent()) {
+                    error = 
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(),
 "Task assignment callback throws an error"));
+                } else {
+                    reconciledAssignment.set(assignment);
+                }

Review Comment:
   Maybe use `Optional.ifPresentOrElse` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -709,6 +708,11 @@ public StreamThread(final Time time,
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
         this.cacheResizer = cacheResizer;
         this.streamsAssignmentInterface = streamsAssignmentInterface;
+        if (streamsAssignmentInterface != null) {

Review Comment:
   Maybe use an `Optional` for `streamsAssignmentInterface`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java:
##########
@@ -281,17 +300,173 @@ public String toString() {
         }
     }
 
+    private final BlockingQueue<BackgroundEvent> onCallbackRequests = new 
LinkedBlockingQueue<>();
+
+    private ApplicationEventHandler applicationEventHandler = null;
+
+    private Optional<Function<Set<StreamsAssignmentInterface.TaskId>, 
Optional<Exception>>> onTasksRevokedCallback = null;
+    private Optional<Function<Assignment, Optional<Exception>>> 
onTasksAssignedCallback = null;
+    private Optional<Supplier<Optional<Exception>>> onAllTasksLostCallback = 
null;
+
+    private final StreamsRebalanceEventProcessor 
streamsRebalanceEventProcessor;
+
+    private class StreamsRebalanceEventProcessor implements 
EventProcessor<BackgroundEvent> {
+
+        @Override
+        public void process(final BackgroundEvent event) {
+            switch (event.type()) {
+                case ERROR:
+                    process((ErrorEvent) event);
+                    break;
+
+                case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED:
+                    process((StreamsOnTasksRevokedCallbackNeededEvent) event);
+                    break;
+
+                case STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED:
+                    process((StreamsOnTasksAssignedCallbackNeededEvent) event);
+                    break;
+
+                case STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED:
+                    process((StreamsOnAllTasksLostCallbackNeededEvent) event);
+                    break;
+
+                default:
+                    throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+
+            }
+        }
+
+        private void process(final ErrorEvent event) {
+            throw event.error();
+        }
+
+        private void process(final StreamsOnTasksRevokedCallbackNeededEvent 
event) {
+            StreamsOnTasksRevokedCallbackCompletedEvent invokedEvent = 
invokeOnTasksRevokedCallback(event.activeTasksToRevoke(), event.future());
+            applicationEventHandler.add(invokedEvent);
+            if (invokedEvent.error().isPresent()) {
+                throw invokedEvent.error().get();
+            }
+        }
+
+        private void process(final StreamsOnTasksAssignedCallbackNeededEvent 
event) {
+            StreamsOnTasksAssignedCallbackCompletedEvent invokedEvent = 
invokeOnTasksAssignedCallback(event.assignment(), event.future());
+            applicationEventHandler.add(invokedEvent);
+            if (invokedEvent.error().isPresent()) {
+                throw invokedEvent.error().get();
+            }
+        }
+
+        private void process(final StreamsOnAllTasksLostCallbackNeededEvent 
event) {
+            StreamsOnAllTasksLostCallbackCompletedEvent invokedEvent = 
invokeOnAllTasksLostCallback(event.future());
+            applicationEventHandler.add(invokedEvent);
+            if (invokedEvent.error().isPresent()) {
+                throw invokedEvent.error().get();
+            }
+        }
+
+        private StreamsOnTasksRevokedCallbackCompletedEvent 
invokeOnTasksRevokedCallback(final Set<StreamsAssignmentInterface.TaskId> 
activeTasksToRevoke,
+                                                                               
           final CompletableFuture<Void> future) {
+            final Optional<Exception> exceptionFromCallback = 
onTasksRevokedCallback
+                .orElseThrow(() -> new IllegalStateException("No tasks 
assignment callback set!")).apply(activeTasksToRevoke);
+
+            return exceptionFromCallback
+                .map(exception ->
+                    new StreamsOnTasksRevokedCallbackCompletedEvent(
+                        future,
+                        
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exception, "Task revocation 
callback throws an error"))
+                    ))
+                .orElseGet(() -> new 
StreamsOnTasksRevokedCallbackCompletedEvent(future, Optional.empty()));
+        }
+
+        private StreamsOnTasksAssignedCallbackCompletedEvent 
invokeOnTasksAssignedCallback(final StreamsAssignmentInterface.Assignment 
assignment,
+                                                                               
            final CompletableFuture<Void> future) {
+            Optional<KafkaException> error = Optional.empty();
+            // ToDo: Can we avoid the following check?
+            if (!assignment.equals(reconciledAssignment.get())) {
+
+                final Optional<Exception> exceptionFromCallback = 
onTasksAssignedCallback
+                    .orElseThrow(() -> new IllegalStateException("No tasks 
assignment callback set!")).apply(assignment);
+
+                if (exceptionFromCallback.isPresent()) {
+                    error = 
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(),
 "Task assignment callback throws an error"));
+                } else {
+                    reconciledAssignment.set(assignment);
+                }
+            }
+            return new StreamsOnTasksAssignedCallbackCompletedEvent(future, 
error);
+        }
+
+        private StreamsOnAllTasksLostCallbackCompletedEvent 
invokeOnAllTasksLostCallback(final CompletableFuture<Void> future) {
+            final Optional<Exception> exceptionFromCallback = 
onAllTasksLostCallback
+                .orElseThrow(() -> new IllegalStateException("No tasks 
assignment callback set!")).get();
+
+            final Optional<KafkaException> error;
+
+            if (exceptionFromCallback.isPresent()) {
+                error = 
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(),
 "Task assignment callback throws an error"));
+            } else {
+                error = Optional.empty();
+                reconciledAssignment.set(Assignment.EMPTY);
+            }

Review Comment:
   Same here



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java:
##########
@@ -281,17 +300,173 @@ public String toString() {
         }
     }
 
+    private final BlockingQueue<BackgroundEvent> onCallbackRequests = new 
LinkedBlockingQueue<>();
+
+    private ApplicationEventHandler applicationEventHandler = null;
+
+    private Optional<Function<Set<StreamsAssignmentInterface.TaskId>, 
Optional<Exception>>> onTasksRevokedCallback = null;
+    private Optional<Function<Assignment, Optional<Exception>>> 
onTasksAssignedCallback = null;
+    private Optional<Supplier<Optional<Exception>>> onAllTasksLostCallback = 
null;
+
+    private final StreamsRebalanceEventProcessor 
streamsRebalanceEventProcessor;
+
+    private class StreamsRebalanceEventProcessor implements 
EventProcessor<BackgroundEvent> {
+
+        @Override
+        public void process(final BackgroundEvent event) {
+            switch (event.type()) {
+                case ERROR:
+                    process((ErrorEvent) event);
+                    break;
+
+                case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED:
+                    process((StreamsOnTasksRevokedCallbackNeededEvent) event);
+                    break;
+
+                case STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED:
+                    process((StreamsOnTasksAssignedCallbackNeededEvent) event);
+                    break;
+
+                case STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED:
+                    process((StreamsOnAllTasksLostCallbackNeededEvent) event);
+                    break;
+
+                default:
+                    throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+
+            }
+        }
+
+        private void process(final ErrorEvent event) {
+            throw event.error();
+        }
+
+        private void process(final StreamsOnTasksRevokedCallbackNeededEvent 
event) {
+            StreamsOnTasksRevokedCallbackCompletedEvent invokedEvent = 
invokeOnTasksRevokedCallback(event.activeTasksToRevoke(), event.future());
+            applicationEventHandler.add(invokedEvent);
+            if (invokedEvent.error().isPresent()) {
+                throw invokedEvent.error().get();
+            }
+        }
+
+        private void process(final StreamsOnTasksAssignedCallbackNeededEvent 
event) {
+            StreamsOnTasksAssignedCallbackCompletedEvent invokedEvent = 
invokeOnTasksAssignedCallback(event.assignment(), event.future());
+            applicationEventHandler.add(invokedEvent);
+            if (invokedEvent.error().isPresent()) {
+                throw invokedEvent.error().get();
+            }
+        }
+
+        private void process(final StreamsOnAllTasksLostCallbackNeededEvent 
event) {
+            StreamsOnAllTasksLostCallbackCompletedEvent invokedEvent = 
invokeOnAllTasksLostCallback(event.future());
+            applicationEventHandler.add(invokedEvent);
+            if (invokedEvent.error().isPresent()) {
+                throw invokedEvent.error().get();
+            }
+        }
+
+        private StreamsOnTasksRevokedCallbackCompletedEvent 
invokeOnTasksRevokedCallback(final Set<StreamsAssignmentInterface.TaskId> 
activeTasksToRevoke,
+                                                                               
           final CompletableFuture<Void> future) {
+            final Optional<Exception> exceptionFromCallback = 
onTasksRevokedCallback
+                .orElseThrow(() -> new IllegalStateException("No tasks 
assignment callback set!")).apply(activeTasksToRevoke);
+
+            return exceptionFromCallback
+                .map(exception ->
+                    new StreamsOnTasksRevokedCallbackCompletedEvent(
+                        future,
+                        
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exception, "Task revocation 
callback throws an error"))
+                    ))
+                .orElseGet(() -> new 
StreamsOnTasksRevokedCallbackCompletedEvent(future, Optional.empty()));
+        }
+
+        private StreamsOnTasksAssignedCallbackCompletedEvent 
invokeOnTasksAssignedCallback(final StreamsAssignmentInterface.Assignment 
assignment,
+                                                                               
            final CompletableFuture<Void> future) {
+            Optional<KafkaException> error = Optional.empty();
+            // ToDo: Can we avoid the following check?
+            if (!assignment.equals(reconciledAssignment.get())) {
+
+                final Optional<Exception> exceptionFromCallback = 
onTasksAssignedCallback
+                    .orElseThrow(() -> new IllegalStateException("No tasks 
assignment callback set!")).apply(assignment);
+
+                if (exceptionFromCallback.isPresent()) {
+                    error = 
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(),
 "Task assignment callback throws an error"));
+                } else {
+                    reconciledAssignment.set(assignment);
+                }
+            }
+            return new StreamsOnTasksAssignedCallbackCompletedEvent(future, 
error);
+        }
+
+        private StreamsOnAllTasksLostCallbackCompletedEvent 
invokeOnAllTasksLostCallback(final CompletableFuture<Void> future) {
+            final Optional<Exception> exceptionFromCallback = 
onAllTasksLostCallback
+                .orElseThrow(() -> new IllegalStateException("No tasks 
assignment callback set!")).get();
+
+            final Optional<KafkaException> error;
+
+            if (exceptionFromCallback.isPresent()) {
+                error = 
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(),
 "Task assignment callback throws an error"));
+            } else {
+                error = Optional.empty();
+                reconciledAssignment.set(Assignment.EMPTY);
+            }
+
+            return new StreamsOnAllTasksLostCallbackCompletedEvent(future, 
error);
+        }
+    }
+
     public StreamsAssignmentInterface(UUID processId,
                                       Optional<HostInfo> endpoint,
                                       Map<String, Subtopology> subtopologyMap,
-                                      Map<String, String> clientTags
-    ) {
+                                      Map<String, String> clientTags) {
         this.processId = processId;
         this.endpoint = endpoint;
         this.subtopologyMap = subtopologyMap;
         this.taskLags = new HashMap<>();
         this.shutdownRequested = new AtomicBoolean(false);
         this.clientTags = clientTags;
+        this.streamsRebalanceEventProcessor = new 
StreamsRebalanceEventProcessor();
+    }
+
+    public void setOnTasksRevokedCallback(final 
Function<Set<StreamsAssignmentInterface.TaskId>, Optional<Exception>> 
onTasksRevokedCallback) {
+        this.onTasksRevokedCallback = 
Optional.ofNullable(onTasksRevokedCallback);
+    }
+
+    public void setOnTasksAssignedCallback(final Function<Assignment, 
Optional<Exception>> onTasksAssignedCallback) {
+        this.onTasksAssignedCallback = 
Optional.ofNullable(onTasksAssignedCallback);
+    }
+
+    public void setOnAllTasksLostCallback(final Supplier<Optional<Exception>> 
onAllTasksLostCallback) {
+        this.onAllTasksLostCallback = 
Optional.ofNullable(onAllTasksLostCallback);
+    }
+
+    public void setApplicationEventHandler(final ApplicationEventHandler 
applicationEventHandler) {
+        this.applicationEventHandler = applicationEventHandler;
+    }
+
+    public CompletableFuture<Void> 
requestOnTasksAssignedCallbackInvocation(final Assignment assignment) {
+        final StreamsOnTasksAssignedCallbackNeededEvent 
onTasksAssignedCallbackNeededEvent = new 
StreamsOnTasksAssignedCallbackNeededEvent(assignment);
+        onCallbackRequests.add(onTasksAssignedCallbackNeededEvent);
+        return onTasksAssignedCallbackNeededEvent.future();
+    }
+
+    public CompletableFuture<Void> 
requestOnTasksRevokedCallbackInvocation(final 
Set<StreamsAssignmentInterface.TaskId> activeTasksToRevoke) {
+        final StreamsOnTasksRevokedCallbackNeededEvent 
onTasksRevokedCallbackNeededEvent = new 
StreamsOnTasksRevokedCallbackNeededEvent(activeTasksToRevoke);
+        onCallbackRequests.add(onTasksRevokedCallbackNeededEvent);
+        return onTasksRevokedCallbackNeededEvent.future();
+    }
+
+    public CompletableFuture<Void> requestOnAllTasksLostCallbackInvocation() {

Review Comment:
   Why all three `CompletableFuture` have no return value? I guess we're saying 
that `CompletableFuture.get()` doesn't throw an `ExcecutionException` 
everything succeeded?
   Asking for my own understanding 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to