hachikuji commented on a change in pull request #8934:
URL: https://github.com/apache/kafka/pull/8934#discussion_r450504028



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -416,7 +428,13 @@ boolean joinGroupIfNeeded(final Timer timer) {
             }
 
             final RequestFuture<ByteBuffer> future = initiateJoinGroup();
-            client.poll(future, timer);
+
+            // if the flat is not set to true; we only try once and not block 
on the join result

Review comment:
       flag?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1258,11 +1261,19 @@ public void assign(Collection<TopicPartition> 
partitions) {
         }
     }
 
+    private boolean coordinatorNeededForAssignment() {

Review comment:
       nit: seems unused?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1218,17 +1218,20 @@ public void assign(Collection<TopicPartition> 
partitions) {
                 throw new IllegalStateException("Consumer is not subscribed to 
any topics or assigned any partitions");
             }
 
-            // poll for new data until the timeout expires
             do {
                 client.maybeTriggerWakeup();
 
                 if (includeMetadataInTimeout) {
-                    // try to update assignment metadata BUT do not need to 
block on the timer,
-                    // since even if we are 1) in the middle of a rebalance or 
2) have partitions
-                    // with unknown starting positions we may still want to 
return some data
-                    // as long as there are some partitions fetchable; NOTE we 
always use a timer with 0ms
-                    // to never block on completing the rebalance procedure if 
there's any
-                    updateAssignmentMetadataIfNeeded(time.timer(0L));
+                    // try to update assignment metadata BUT do not need to 
block on the timer if we still have
+                    // some assigned partitions, since even if we are 1) in 
the middle of a rebalance
+                    // or 2) have partitions with unknown starting positions 
we may still want to return some data
+                    // as long as there are some partitions fetchable; NOTE we 
do not block on rebalancing to complete
+                    // if there's one pending if we still have some fetchable 
partitions
+                    if (subscriptions.fetchablePartitions(tp -> 
true).isEmpty()) {

Review comment:
       This call to `fetchablePartitions` is unfortunate. It is an entire pass 
over all the assigned partitions on every poll. Is there any way it can be 
avoided?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -416,7 +428,13 @@ boolean joinGroupIfNeeded(final Timer timer) {
             }
 
             final RequestFuture<ByteBuffer> future = initiateJoinGroup();
-            client.poll(future, timer);
+
+            // if the flat is not set to true; we only try once and not block 
on the join result
+            if (waitUntilComplete)
+                client.poll(future, timer);
+            else
+                client.poll(timer);

Review comment:
       Do we want this to be `poll(0)`? Otherwise we're still blocking here.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1218,17 +1218,20 @@ public void assign(Collection<TopicPartition> 
partitions) {
                 throw new IllegalStateException("Consumer is not subscribed to 
any topics or assigned any partitions");
             }
 
-            // poll for new data until the timeout expires
             do {
                 client.maybeTriggerWakeup();
 
                 if (includeMetadataInTimeout) {
-                    // try to update assignment metadata BUT do not need to 
block on the timer,
-                    // since even if we are 1) in the middle of a rebalance or 
2) have partitions
-                    // with unknown starting positions we may still want to 
return some data
-                    // as long as there are some partitions fetchable; NOTE we 
always use a timer with 0ms
-                    // to never block on completing the rebalance procedure if 
there's any
-                    updateAssignmentMetadataIfNeeded(time.timer(0L));
+                    // try to update assignment metadata BUT do not need to 
block on the timer if we still have

Review comment:
       Just making sure I understand the problem. In the old logic, 
`updateAssignmentMetadataIfNeeded` never blocks. Even if we have no assignment 
and do not know the coordinator, we won't block here. That means we should fall 
through to `pollForFetches`. The poll timeout is set by the following logic:
   ```java
           long pollTimeout = coordinator == null ? timer.remainingMs() :
                   Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
timer.remainingMs());
   ```
   I think the only way I can see this logic getting into trouble is if 
`Heartbeat.timeToNextHeartbeat` returns a small value. So if the coordinator 
remains unknown for a little while and we need to heartbeat, we might get into 
a busy loop. Is that about right or are there other cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to