lianetm commented on code in PR #16673: URL: https://github.com/apache/kafka/pull/16673#discussion_r1721809134
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1443,12 +1442,10 @@ public void assign(Collection<TopicPartition> partitions) { // be no following rebalance. // // See the ApplicationEventProcessor.process() method that handles this event for more detail. - applicationEventHandler.add(new AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds())); - - log.info("Assigned to partition(s): {}", partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); - - if (subscriptions.assignFromUser(new HashSet<>(partitions))) - applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); + Timer timer = time.timer(defaultApiTimeoutMs); + AssignmentChangeEvent assignmentChangeEvent = new AssignmentChangeEvent(timer.currentTimeMs(), calculateDeadlineMs(timer), partitions); + applicationEventHandler.addAndGet(assignmentChangeEvent); + log.info("Assigned new partitions"); Review Comment: nit: should we remove this log line? We already have a very similar one that will show "Assigned to partitions...." when the actual change happens ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ########## @@ -118,13 +118,12 @@ private static Stream<Arguments> applicationEvents() { final long currentTimeMs = 12345; return Stream.of( Arguments.of(new PollEvent(100)), - Arguments.of(new NewTopicsMetadataUpdateRequestEvent()), Arguments.of(new AsyncCommitEvent(new HashMap<>())), Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)), Arguments.of(new ResetPositionsEvent(500)), Arguments.of(new ValidatePositionsEvent(500)), Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)), - Arguments.of(new AssignmentChangeEvent(offset, currentTimeMs))); + Arguments.of(new AssignmentChangeEvent(12345, 12345, Collections.emptyList()))); Review Comment: note that by adding this we're only testing that the call to `process(AppEvent)` ends up triggering the `process(AssignmentChangeEvent)`, nothing more. So should we add a test for the `process(AssignmentChangeEvent)`? We should ensure that it takes the actions we expect (commit, assign, request metadata update), using a `processor` instance, not a mock. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -2182,6 +2207,16 @@ private void completeUnsubscribeApplicationEventSuccessfully() { }).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class)); } + private void completeAssignmentChangeEventSuccessfully() { + doAnswer(invocation -> { + AssignmentChangeEvent event = invocation.getArgument(0); + // In AsyncKafkaConsumer, the subscription is updated in the background thread. Review Comment: nit: this inline comment doesn't seem to add much imo, the func name and body seem clear enough to me ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ########## @@ -118,13 +118,12 @@ private static Stream<Arguments> applicationEvents() { final long currentTimeMs = 12345; Review Comment: this is not used anymore so let's just remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org