mimaison commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r654558597
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -813,34 +815,56 @@ public void handle(SyncGroupResponse syncResponse, */ private RequestFuture<Void> sendFindCoordinatorRequest(Node node) { // initiate the group metadata request - log.debug("Sending FindCoordinator request to broker {}", node); - FindCoordinatorRequest.Builder requestBuilder = - new FindCoordinatorRequest.Builder( - new FindCoordinatorRequestData() - .setKeyType(CoordinatorType.GROUP.id()) - .setKey(this.rebalanceConfig.groupId)); + log.debug("Sending FindCoordinator request to broker {} with batch={}", node, batchFindCoordinator); + FindCoordinatorRequestData data = new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.GROUP.id()); + if (batchFindCoordinator) { + data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId)); + } else { + data.setKey(this.rebalanceConfig.groupId); + } + FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data); return client.send(node, requestBuilder) - .compose(new FindCoordinatorResponseHandler()); + .compose(new FindCoordinatorResponseHandler(batchFindCoordinator)); } private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> { + private boolean batch; + FindCoordinatorResponseHandler(boolean batch) { + this.batch = batch; + } @Override public void onSuccess(ClientResponse resp, RequestFuture<Void> future) { log.debug("Received FindCoordinator response {}", resp); FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody(); - Errors error = findCoordinatorResponse.error(); + if (batch && findCoordinatorResponse.data().coordinators().size() != 1) { + log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator"); + future.raise(new IllegalStateException("Group coordinator lookup failed: Invalid response containing more than a single coordinator")); + } + Errors error = batch + ? Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode()) + : findCoordinatorResponse.error(); if (error == Errors.NONE) { synchronized (AbstractCoordinator.this) { + int nodeId = batch + ? findCoordinatorResponse.data().coordinators().get(0).nodeId() Review comment: This variable would only exist if `batch` is true which makes it pretty awkward. ########## File path: clients/src/test/java/org/apache/kafka/clients/MockClient.java ########## @@ -245,10 +246,17 @@ public void send(ClientRequest request, long now) { unsupportedVersionException = new UnsupportedVersionException( "Api " + request.apiKey() + " with version " + version); } else { - AbstractRequest abstractRequest = request.requestBuilder().build(version); - if (!futureResp.requestMatcher.matches(abstractRequest)) - throw new IllegalStateException("Request matcher did not match next-in-line request " - + abstractRequest + " with prepared response " + futureResp.responseBody); + try { + AbstractRequest abstractRequest = request.requestBuilder().build(version); + if (!futureResp.requestMatcher.matches(abstractRequest)) + continue; Review comment: Oops! Yes I removed this to silence failing tests while refactoring. I'll undo this. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java ########## @@ -57,13 +58,13 @@ * Note that keys which received a retriable error should be left out of the * result. They will be retried automatically. * - * @param brokerId the brokerId that the associated request was sent to + * @param broker the broker that the associated request was sent to Review comment: Not entirely sure about the new naming in place now, but does that still count as a broker? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java ########## @@ -57,13 +58,13 @@ * Note that keys which received a retriable error should be left out of the * result. They will be retried automatically. * - * @param brokerId the brokerId that the associated request was sent to + * @param broker the broker that the associated request was sent to Review comment: Ok, thanks for the clarifications. I feel like `broker` is still fine here. Otherwise, maybe `node`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org