lianetm commented on code in PR #16673:
URL: https://github.com/apache/kafka/pull/16673#discussion_r1721809134


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1443,12 +1442,10 @@ public void assign(Collection<TopicPartition> 
partitions) {
             // be no following rebalance.
             //
             // See the ApplicationEventProcessor.process() method that handles 
this event for more detail.
-            applicationEventHandler.add(new 
AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds()));
-
-            log.info("Assigned to partition(s): {}", 
partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", 
")));
-
-            if (subscriptions.assignFromUser(new HashSet<>(partitions)))
-                applicationEventHandler.add(new 
NewTopicsMetadataUpdateRequestEvent());
+            Timer timer = time.timer(defaultApiTimeoutMs);
+            AssignmentChangeEvent assignmentChangeEvent = new 
AssignmentChangeEvent(timer.currentTimeMs(), calculateDeadlineMs(timer), 
partitions);
+            applicationEventHandler.addAndGet(assignmentChangeEvent);
+            log.info("Assigned new partitions");

Review Comment:
   nit: should we remove this log line? We already have a very similar one that 
will show "Assigned to partitions...." when the actual change happens



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -118,13 +118,12 @@ private static Stream<Arguments> applicationEvents() {
         final long currentTimeMs = 12345;
         return Stream.of(
                 Arguments.of(new PollEvent(100)),
-                Arguments.of(new NewTopicsMetadataUpdateRequestEvent()),
                 Arguments.of(new AsyncCommitEvent(new HashMap<>())),
                 Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
                 Arguments.of(new ResetPositionsEvent(500)),
                 Arguments.of(new ValidatePositionsEvent(500)),
                 Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
-                Arguments.of(new AssignmentChangeEvent(offset, 
currentTimeMs)));
+                Arguments.of(new AssignmentChangeEvent(12345, 12345, 
Collections.emptyList())));

Review Comment:
   note that by adding this we're only testing that the call to 
`process(AppEvent)` ends up triggering the `process(AssignmentChangeEvent)`, 
nothing more. So should we add a test for the `process(AssignmentChangeEvent)`? 
We should ensure that it takes the actions we expect (commit, assign, request 
metadata update), using a `processor` instance, not a mock. 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -2182,6 +2207,16 @@ private void 
completeUnsubscribeApplicationEventSuccessfully() {
         
}).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
     }
 
+    private void completeAssignmentChangeEventSuccessfully() {
+        doAnswer(invocation -> {
+            AssignmentChangeEvent event = invocation.getArgument(0);
+            // In AsyncKafkaConsumer, the subscription is updated in the 
background thread.

Review Comment:
   nit: this inline comment doesn't seem to add much imo, the func name and 
body seem clear enough to me



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -118,13 +118,12 @@ private static Stream<Arguments> applicationEvents() {
         final long currentTimeMs = 12345;

Review Comment:
   this is not used anymore so let's just remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to