lianetm commented on code in PR #16673:
URL: https://github.com/apache/kafka/pull/16673#discussion_r1707299652


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java:
##########
@@ -16,33 +16,31 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
-import java.util.Collections;
-import java.util.Map;
+import java.util.Collection;
 
-public class AssignmentChangeEvent extends ApplicationEvent {
+public class AssignmentChangeEvent extends CompletableApplicationEvent<Void> {
 
-    private final Map<TopicPartition, OffsetAndMetadata> offsets;
     private final long currentTimeMs;
+    private final Collection<TopicPartition> partitions;
 
-    public AssignmentChangeEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets, final long currentTimeMs) {
-        super(Type.ASSIGNMENT_CHANGE);
-        this.offsets = Collections.unmodifiableMap(offsets);
+    public AssignmentChangeEvent(final long currentTimeMs, final long 
remainingTImeMs, final Collection<TopicPartition> partitions) {

Review Comment:
   instead of `remainingTimeMs`, should we pass `deadlineMs` as it's done for 
the other CompletableEvents? just for consistency



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java:
##########
@@ -16,33 +16,31 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
-import java.util.Collections;
-import java.util.Map;
+import java.util.Collection;
 
-public class AssignmentChangeEvent extends ApplicationEvent {
+public class AssignmentChangeEvent extends CompletableApplicationEvent<Void> {
 
-    private final Map<TopicPartition, OffsetAndMetadata> offsets;
     private final long currentTimeMs;

Review Comment:
   I notice that this is only needed to `updateAutoCommitTimer` when processing 
an `AssignmentChangeEvent`. But we already have a `Time`in the 
`ApplicationEventProcessor`. So, could we simplify here, remove this 
`currentTimeMs`, and use that time.milliseconds as argument on ln 204 of the 
AppEventProcessor?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1443,12 +1443,15 @@ public void assign(Collection<TopicPartition> 
partitions) {
             // be no following rebalance.
             //
             // See the ApplicationEventProcessor.process() method that handles 
this event for more detail.
-            applicationEventHandler.add(new 
AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds()));
-
-            log.info("Assigned to partition(s): {}", 
partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", 
")));
-
-            if (subscriptions.assignFromUser(new HashSet<>(partitions)))
-                applicationEventHandler.add(new 
NewTopicsMetadataUpdateRequestEvent());
+            Timer timer = time.timer(Long.MAX_VALUE);

Review Comment:
   Should we used the `defaultApiTimeoutMs` here instead? Just to be sure that 
in the error case that the assign event does not complete, we don't block 
forever. 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1748,8 +1767,7 @@ public void testMultipleBackgroundErrors() {
         final KafkaException expectedException2 = new KafkaException("Spam, 
Spam, Spam");
         final ErrorEvent errorEvent2 = new ErrorEvent(expectedException2);
         backgroundEventQueue.add(errorEvent2);
-        consumer.assign(singletonList(new TopicPartition("topic", 0)));
-        final KafkaException exception = assertThrows(KafkaException.class, () 
-> consumer.poll(Duration.ZERO));
+        final KafkaException exception = assertThrows(KafkaException.class, () 
-> consumer.assign(singletonList(new TopicPartition("topic", 0))));

Review Comment:
   oh I hadn't realized this undesired side effect of processing background 
events on assign, but of course, I missed that. I would say we don't want this 
change in behaviour. 
   
   We only need to ensure that the assign does not return until the background 
event completes, so not really need to process background events, we just need 
`applicationEventHandler.addAndGet(assignmentChangeEvent)` (and I would ensure 
we create the event using the `defaultApiTimeout` for the deadline. Makes 
sense? the we would keep the same behaviour we had on this test (assign does 
not throw any background event, poll does).
   
   Just for the record, on the unsubscribe we do need to process background 
events because of the rebalanceCallbacks that run in the app thread, but that's 
not the case here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to