guozhangwang commented on code in PR #13138:
URL: https://github.com/apache/kafka/pull/13138#discussion_r1083125061


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -149,11 +156,14 @@ private void onSuccessfulResponse(final 
FindCoordinatorResponseData.Coordinator
                 coordinator.host(),
                 coordinator.port());
         log.info("Discovered group coordinator {}", coordinator);
-        coordinatorRequestState.reset();
+        coordinatorRequestState.onSuccessfulAttempt(currentTimeMs);
     }
 
-    private void onFailedCoordinatorResponse(final Exception exception, final 
long currentTimeMs) {
-        coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+    private void onFailedResponse(
+        final long currentTimeMs,
+        final Throwable exception
+    ) {
+        coordinatorRequestState.onFailedAttempt(currentTimeMs);

Review Comment:
   How about moving this to the end of the function as well, since 
`nonRetriableErrorHandler.handle(exception);` may take time?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -72,17 +73,16 @@ public NetworkClientDelegate(
      * @param currentTimeMs current time
      * @return a list of client response
      */
-    public List<ClientResponse> poll(final long timeoutMs, final long 
currentTimeMs) {
+    public void poll(final long timeoutMs, final long currentTimeMs) {
         trySend(currentTimeMs);
 
         long pollTimeoutMs = timeoutMs;
         if (!unsentRequests.isEmpty()) {
             pollTimeoutMs = Math.min(retryBackoffMs, pollTimeoutMs);
         }
-        List<ClientResponse> res = this.client.poll(pollTimeoutMs, 
currentTimeMs);
+
+        this.client.poll(pollTimeoutMs, currentTimeMs);
         checkDisconnects();
-        wakeup();

Review Comment:
   We can also remove the function in 
https://github.com/apache/kafka/pull/13138/files#diff-a5be3b830b3475fd019bb6c218b7b81d5f5f25f77587f33e48c92c4dc4271ed5R168
 as well right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -72,17 +73,16 @@ public NetworkClientDelegate(
      * @param currentTimeMs current time
      * @return a list of client response
      */
-    public List<ClientResponse> poll(final long timeoutMs, final long 
currentTimeMs) {
+    public void poll(final long timeoutMs, final long currentTimeMs) {
         trySend(currentTimeMs);
 
         long pollTimeoutMs = timeoutMs;
         if (!unsentRequests.isEmpty()) {
             pollTimeoutMs = Math.min(retryBackoffMs, pollTimeoutMs);
         }
-        List<ClientResponse> res = this.client.poll(pollTimeoutMs, 
currentTimeMs);
+
+        this.client.poll(pollTimeoutMs, currentTimeMs);
         checkDisconnects();
-        wakeup();

Review Comment:
   NVM, the client is not exposed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to