dajac commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1461922947
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -820,6 +848,8 @@ private void onFailure(final long currentTimeMs,
} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
Review Comment:
Do you know why we don't handle `COORDINATOR_NOT_AVAILABLE`? I think that we
should treat it like `NOT_COORDINATOR`.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) {
continue;
}
- if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
+ if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+ return;
+ } else if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
error == Errors.NOT_COORDINATOR ||
error == Errors.REQUEST_TIMED_OUT) {
coordinatorRequestManager.markCoordinatorUnknown(error.message(),
currentTimeMs);
maybeRetry(currentTimeMs, error.exception());
return;
+ } else if (error == Errors.FENCED_INSTANCE_ID) {
+ log.info("OffsetCommit failed due to group instance id
{} fenced: {}", groupInstanceId, error.message());
+ future.completeExceptionally(new
CommitFailedException());
Review Comment:
nit: While we are here, should we put an error msg?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) {
continue;
}
- if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
+ if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+ return;
+ } else if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
error == Errors.NOT_COORDINATOR ||
error == Errors.REQUEST_TIMED_OUT) {
coordinatorRequestManager.markCoordinatorUnknown(error.message(),
currentTimeMs);
maybeRetry(currentTimeMs, error.exception());
return;
+ } else if (error == Errors.FENCED_INSTANCE_ID) {
+ log.info("OffsetCommit failed due to group instance id
{} fenced: {}", groupInstanceId, error.message());
+ future.completeExceptionally(new
CommitFailedException());
+ return;
+ } else if (error == Errors.OFFSET_METADATA_TOO_LARGE ||
+ error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
+ future.completeExceptionally(error.exception());
+ return;
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS ||
error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
// just retry
Review Comment:
Should we fail the future and rely and the outer retry loop now?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -770,8 +785,22 @@ public NetworkClientDelegate.UnsentRequest
toUnsentRequest() {
new ArrayList<>(this.requestedPartitions),
throwOnFetchStableOffsetUnsupported);
}
- return new NetworkClientDelegate.UnsentRequest(builder,
coordinatorRequestManager.coordinator())
- .whenComplete((r, t) -> onResponse(r.receivedTimeMs(),
(OffsetFetchResponse) r.responseBody()));
+ NetworkClientDelegate.UnsentRequest request =
+ new NetworkClientDelegate.UnsentRequest(builder,
coordinatorRequestManager.coordinator());
+ return request.whenComplete((response, error) -> {
+ if (error == null) {
+ onResponse(response.receivedTimeMs(),
(OffsetFetchResponse) response.responseBody());
+ } else {
+ log.debug("OffsetFetch completed with error for partitions
{}", requestedPartitions, error);
+ long currentTimeMs = request.handler().completionTimeMs();
+ handleCoordinatorDisconnect(error, currentTimeMs);
+ if (error instanceof RetriableException) {
+ maybeRetry(currentTimeMs, error);
+ } else {
+ future.completeExceptionally(error);
+ }
+ }
Review Comment:
This code looks pretty similar to the one that we have for the offset
commit. Is there an opportunity to consolidate/reuse?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]