mimaison commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r654558597
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -813,34 +815,56 @@ public void handle(SyncGroupResponse syncResponse,
*/
private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
// initiate the group metadata request
- log.debug("Sending FindCoordinator request to broker {}", node);
- FindCoordinatorRequest.Builder requestBuilder =
- new FindCoordinatorRequest.Builder(
- new FindCoordinatorRequestData()
- .setKeyType(CoordinatorType.GROUP.id())
- .setKey(this.rebalanceConfig.groupId));
+ log.debug("Sending FindCoordinator request to broker {} with
batch={}", node, batchFindCoordinator);
+ FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+ .setKeyType(CoordinatorType.GROUP.id());
+ if (batchFindCoordinator) {
+
data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId));
+ } else {
+ data.setKey(this.rebalanceConfig.groupId);
+ }
+ FindCoordinatorRequest.Builder requestBuilder = new
FindCoordinatorRequest.Builder(data);
return client.send(node, requestBuilder)
- .compose(new FindCoordinatorResponseHandler());
+ .compose(new
FindCoordinatorResponseHandler(batchFindCoordinator));
}
private class FindCoordinatorResponseHandler extends
RequestFutureAdapter<ClientResponse, Void> {
+ private boolean batch;
+ FindCoordinatorResponseHandler(boolean batch) {
+ this.batch = batch;
+ }
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future)
{
log.debug("Received FindCoordinator response {}", resp);
FindCoordinatorResponse findCoordinatorResponse =
(FindCoordinatorResponse) resp.responseBody();
- Errors error = findCoordinatorResponse.error();
+ if (batch && findCoordinatorResponse.data().coordinators().size()
!= 1) {
+ log.error("Group coordinator lookup failed: Invalid response
containing more than a single coordinator");
+ future.raise(new IllegalStateException("Group coordinator
lookup failed: Invalid response containing more than a single coordinator"));
+ }
+ Errors error = batch
+ ?
Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode())
+ : findCoordinatorResponse.error();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
+ int nodeId = batch
+ ?
findCoordinatorResponse.data().coordinators().get(0).nodeId()
Review comment:
This variable would only exist if `batch` is true which makes it pretty
awkward.
##########
File path: clients/src/test/java/org/apache/kafka/clients/MockClient.java
##########
@@ -245,10 +246,17 @@ public void send(ClientRequest request, long now) {
unsupportedVersionException = new UnsupportedVersionException(
"Api " + request.apiKey() + " with version " +
version);
} else {
- AbstractRequest abstractRequest =
request.requestBuilder().build(version);
- if (!futureResp.requestMatcher.matches(abstractRequest))
- throw new IllegalStateException("Request matcher did not
match next-in-line request "
- + abstractRequest + " with prepared response " +
futureResp.responseBody);
+ try {
+ AbstractRequest abstractRequest =
request.requestBuilder().build(version);
+ if (!futureResp.requestMatcher.matches(abstractRequest))
+ continue;
Review comment:
Oops! Yes I removed this to silence failing tests while refactoring.
I'll undo this.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
##########
@@ -57,13 +58,13 @@
* Note that keys which received a retriable error should be left out of
the
* result. They will be retried automatically.
*
- * @param brokerId the brokerId that the associated request was sent to
+ * @param broker the broker that the associated request was sent to
Review comment:
Not entirely sure about the new naming in place now, but does that still
count as a broker?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
##########
@@ -57,13 +58,13 @@
* Note that keys which received a retriable error should be left out of
the
* result. They will be retried automatically.
*
- * @param brokerId the brokerId that the associated request was sent to
+ * @param broker the broker that the associated request was sent to
Review comment:
Ok, thanks for the clarifications. I feel like `broker` is still fine
here. Otherwise, maybe `node`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]