lianetm commented on code in PR #16673: URL: https://github.com/apache/kafka/pull/16673#discussion_r1707299652
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java: ########## @@ -16,33 +16,31 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import java.util.Collections; -import java.util.Map; +import java.util.Collection; -public class AssignmentChangeEvent extends ApplicationEvent { +public class AssignmentChangeEvent extends CompletableApplicationEvent<Void> { - private final Map<TopicPartition, OffsetAndMetadata> offsets; private final long currentTimeMs; + private final Collection<TopicPartition> partitions; - public AssignmentChangeEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, final long currentTimeMs) { - super(Type.ASSIGNMENT_CHANGE); - this.offsets = Collections.unmodifiableMap(offsets); + public AssignmentChangeEvent(final long currentTimeMs, final long remainingTImeMs, final Collection<TopicPartition> partitions) { Review Comment: instead of `remainingTimeMs`, should we pass `deadlineMs` as it's done for the other CompletableEvents? just for consistency ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java: ########## @@ -16,33 +16,31 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import java.util.Collections; -import java.util.Map; +import java.util.Collection; -public class AssignmentChangeEvent extends ApplicationEvent { +public class AssignmentChangeEvent extends CompletableApplicationEvent<Void> { - private final Map<TopicPartition, OffsetAndMetadata> offsets; private final long currentTimeMs; Review Comment: I notice that this is only needed to `updateAutoCommitTimer` when processing an `AssignmentChangeEvent`. But we already have a `Time`in the `ApplicationEventProcessor`. So, could we simplify here, remove this `currentTimeMs`, and use that time.milliseconds as argument on ln 204 of the AppEventProcessor? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1443,12 +1443,15 @@ public void assign(Collection<TopicPartition> partitions) { // be no following rebalance. // // See the ApplicationEventProcessor.process() method that handles this event for more detail. - applicationEventHandler.add(new AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds())); - - log.info("Assigned to partition(s): {}", partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); - - if (subscriptions.assignFromUser(new HashSet<>(partitions))) - applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); + Timer timer = time.timer(Long.MAX_VALUE); Review Comment: Should we used the `defaultApiTimeoutMs` here instead? Just to be sure that in the error case that the assign event does not complete, we don't block forever. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -1748,8 +1767,7 @@ public void testMultipleBackgroundErrors() { final KafkaException expectedException2 = new KafkaException("Spam, Spam, Spam"); final ErrorEvent errorEvent2 = new ErrorEvent(expectedException2); backgroundEventQueue.add(errorEvent2); - consumer.assign(singletonList(new TopicPartition("topic", 0))); - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); + final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.assign(singletonList(new TopicPartition("topic", 0)))); Review Comment: oh I hadn't realized this undesired side effect of processing background events on assign, but of course, I missed that. I would say we don't want this change in behaviour. We only need to ensure that the assign does not return until the background event completes, so not really need to process background events, we just need `applicationEventHandler.addAndGet(assignmentChangeEvent)` (and I would ensure we create the event using the `defaultApiTimeout` for the deadline. Makes sense? the we would keep the same behaviour we had on this test (assign does not throw any background event, poll does). Just for the record, on the unsubscribe we do need to process background events because of the rebalanceCallbacks that run in the app thread, but that's not the case here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org