mimaison commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r654558597


##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -813,34 +815,56 @@ public void handle(SyncGroupResponse syncResponse,
      */
     private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
         // initiate the group metadata request
-        log.debug("Sending FindCoordinator request to broker {}", node);
-        FindCoordinatorRequest.Builder requestBuilder =
-                new FindCoordinatorRequest.Builder(
-                        new FindCoordinatorRequestData()
-                            .setKeyType(CoordinatorType.GROUP.id())
-                            .setKey(this.rebalanceConfig.groupId));
+        log.debug("Sending FindCoordinator request to broker {} with 
batch={}", node, batchFindCoordinator);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(CoordinatorType.GROUP.id());
+        if (batchFindCoordinator) {
+            
data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId));
+        } else {
+            data.setKey(this.rebalanceConfig.groupId);
+        }
+        FindCoordinatorRequest.Builder requestBuilder = new 
FindCoordinatorRequest.Builder(data);
         return client.send(node, requestBuilder)
-                .compose(new FindCoordinatorResponseHandler());
+                .compose(new 
FindCoordinatorResponseHandler(batchFindCoordinator));
     }
 
     private class FindCoordinatorResponseHandler extends 
RequestFutureAdapter<ClientResponse, Void> {
+        private boolean batch;
+        FindCoordinatorResponseHandler(boolean batch) {
+            this.batch = batch;
+        }
 
         @Override
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) 
{
             log.debug("Received FindCoordinator response {}", resp);
 
             FindCoordinatorResponse findCoordinatorResponse = 
(FindCoordinatorResponse) resp.responseBody();
-            Errors error = findCoordinatorResponse.error();
+            if (batch && findCoordinatorResponse.data().coordinators().size() 
!= 1) {
+                log.error("Group coordinator lookup failed: Invalid response 
containing more than a single coordinator");
+                future.raise(new IllegalStateException("Group coordinator 
lookup failed: Invalid response containing more than a single coordinator"));
+            }
+            Errors error = batch
+                    ? 
Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode())
+                    : findCoordinatorResponse.error();
             if (error == Errors.NONE) {
                 synchronized (AbstractCoordinator.this) {
+                    int nodeId = batch
+                            ? 
findCoordinatorResponse.data().coordinators().get(0).nodeId()

Review comment:
       This variable would only exist if `batch` is true which makes it pretty 
awkward.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MockClient.java
##########
@@ -245,10 +246,17 @@ public void send(ClientRequest request, long now) {
                 unsupportedVersionException = new UnsupportedVersionException(
                         "Api " + request.apiKey() + " with version " + 
version);
             } else {
-                AbstractRequest abstractRequest = 
request.requestBuilder().build(version);
-                if (!futureResp.requestMatcher.matches(abstractRequest))
-                    throw new IllegalStateException("Request matcher did not 
match next-in-line request "
-                            + abstractRequest + " with prepared response " + 
futureResp.responseBody);
+                try {
+                    AbstractRequest abstractRequest = 
request.requestBuilder().build(version);
+                    if (!futureResp.requestMatcher.matches(abstractRequest))
+                        continue;

Review comment:
       Oops! Yes I removed this to silence failing tests while refactoring. 
I'll undo this.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
##########
@@ -57,13 +58,13 @@
      * Note that keys which received a retriable error should be left out of 
the
      * result. They will be retried automatically.
      *
-     * @param brokerId the brokerId that the associated request was sent to
+     * @param broker the broker that the associated request was sent to

Review comment:
       Not entirely sure about the new naming in place now, but does that still 
count as a broker? 

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
##########
@@ -57,13 +58,13 @@
      * Note that keys which received a retriable error should be left out of 
the
      * result. They will be retried automatically.
      *
-     * @param brokerId the brokerId that the associated request was sent to
+     * @param broker the broker that the associated request was sent to

Review comment:
       Ok, thanks for the clarifications. I feel like `broker` is still fine 
here. Otherwise, maybe `node`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to