guozhangwang commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1142683410


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    private class UnsentOffsetFetchRequest extends RequestState {
+        public final Set<TopicPartition> requestedPartitions;
+        public final GroupState.Generation requestedGeneration;
+        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+
+        public UnsentOffsetFetchRequest(final Set<TopicPartition> partitions,
+                                        final GroupState.Generation generation,
+                                        final long retryBackoffMs) {
+            super(retryBackoffMs);
+            this.requestedPartitions = partitions;
+            this.requestedGeneration = generation;
+            this.future = new CompletableFuture<>();
+        }
+
+        public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+        }
+
+        public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+            OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+                    groupState.groupId,
+                    true,
+                    new ArrayList<>(this.requestedPartitions),
+                    throwOnFetchStableOffsetUnsupported);
+            NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+                    builder,
+                    coordinatorRequestManager.coordinator());
+            unsentRequest.future().whenComplete((r, t) -> {
+                onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+            });
+            return unsentRequest;
+        }
+
+        public void onResponse(
+                final long currentTimeMs,
+                final OffsetFetchResponse response) {
+            Errors responseError = 
response.groupLevelError(groupState.groupId);
+            if (responseError != Errors.NONE) {
+                onFailure(currentTimeMs, responseError);
+                return;
+            }
+            onSuccess(currentTimeMs, response);
+        }
+
+        private void onFailure(final long currentTimeMs,
+                               final Errors responseError) {
+            log.debug("Offset fetch failed: {}", responseError.message());
+
+            // TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
+            if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                retry(currentTimeMs);
+            } else if (responseError == Errors.NOT_COORDINATOR) {
+                // re-discover the coordinator and retry
+                
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+                retry(currentTimeMs);
+            } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+                
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+            } else {
+                future.completeExceptionally(new KafkaException("Unexpected 
error in fetch offset response: " + responseError.message()));
+            }
+        }
+
+        private void retry(final long currentTimeMs) {
+            onFailedAttempt(currentTimeMs);
+            onSendAttempt(currentTimeMs);
+            pendingRequests.addOffsetFetchRequest(this);
+        }
+
+        private void onSuccess(final long currentTimeMs,
+                               final OffsetFetchResponse response) {
+            Set<String> unauthorizedTopics = null;
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData =
+                    response.partitionDataMap(groupState.groupId);
+            Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(responseData.size());
+            Set<TopicPartition> unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry : responseData.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+                if (partitionData.hasError()) {
+                    Errors error = partitionData.error;
+                    log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                        future.completeExceptionally(new KafkaException("Topic 
or Partition " + tp + " does " +
+                                "not " +
+                                "exist"));
+                        return;
+                    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                        if (unauthorizedTopics == null) {
+                            unauthorizedTopics = new HashSet<>();
+                        }
+                        unauthorizedTopics.add(tp.topic());
+                    } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
+                        unstableTxnOffsetTopicPartitions.add(tp);
+                    } else {
+                        future.completeExceptionally(new 
KafkaException("Unexpected error in fetch offset " +
+                                "response for partition " + tp + ": " + 
error.message()));
+                        return;
+                    }
+                } else if (partitionData.offset >= 0) {
+                    // record the position with the offset (-1 indicates no 
committed offset to fetch);
+                    // if there's no committed offset, record as null
+                    offsets.put(tp, new 
OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, 
partitionData.metadata));
+                } else {
+                    log.info("Found no committed offset for partition {}", tp);
+                    offsets.put(tp, null);
+                }
+            }
+
+            if (unauthorizedTopics != null) {
+                future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
+                // TODO: Optimization question: Do we need to retry all 
partitions upon a single partition error?
+                log.info("The following partitions still have unstable offsets 
" +
+                        "which are not cleared on the broker side: {}" +
+                        ", this could be either " +
+                        "transactional offsets waiting for completion, or " +
+                        "normal offsets waiting for replication after 
appending to local log", unstableTxnOffsetTopicPartitions);
+                retry(currentTimeMs);
+            } else {
+                future.complete(offsets);
+            }
+        }
+
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future) {
+            return this.future.whenComplete((r, t) -> {
+                if (t != null) {
+                    future.completeExceptionally(t);
+                } else {
+                    future.complete(r);
+                }
+            });
+        }
+    }
+
+    /**
+     * <p>This is used to stage the unsent requests, i.e., {@link 
UnsentOffsetCommitRequest} and {@link UnsentOffsetFetchRequest}.
+     *
+     * <p>If the request is new. It will be enqueued and will be sent upon the 
next
+     * poll.
+     *
+     * <p>If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+     * completion of the existing one.
+     *
+     * <p>There is a special handling for the {@link 
UnsentOffsetFetchRequest}. If a duplicated request was sent
+     * previously, we will chain the future to the current future.
+     */
+
+    class PendingRequests {
+        // Queue is used to ensure the sequence of commit
+        Queue<UnsentOffsetCommitRequest> unsentOffsetCommits = new 
LinkedList<>();
+        List<UnsentOffsetFetchRequest> unsentOffsetFetches = new ArrayList<>();
+        List<UnsentOffsetFetchRequest> inflightOffsetFetches = new 
ArrayList<>();
+
+        public boolean hasUnsentRequests() {
+            return !unsentOffsetCommits.isEmpty() || 
!unsentOffsetFetches.isEmpty();
+        }
+
+        public CompletableFuture<ClientResponse> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {

Review Comment:
   nit: we can also leave a TODO comment here to de-dup offset commit requests 
if they are committing the exactly same offset for those partitions?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    private class UnsentOffsetFetchRequest extends RequestState {

Review Comment:
   How about renaming to `OffsetFetchRequestState`, and ditto for 
`OffsetCommitRequestState`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    private class UnsentOffsetFetchRequest extends RequestState {
+        public final Set<TopicPartition> requestedPartitions;
+        public final GroupState.Generation requestedGeneration;
+        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+
+        public UnsentOffsetFetchRequest(final Set<TopicPartition> partitions,
+                                        final GroupState.Generation generation,
+                                        final long retryBackoffMs) {
+            super(retryBackoffMs);
+            this.requestedPartitions = partitions;
+            this.requestedGeneration = generation;
+            this.future = new CompletableFuture<>();
+        }
+
+        public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+        }
+
+        public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+            OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+                    groupState.groupId,
+                    true,
+                    new ArrayList<>(this.requestedPartitions),
+                    throwOnFetchStableOffsetUnsupported);
+            NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+                    builder,
+                    coordinatorRequestManager.coordinator());
+            unsentRequest.future().whenComplete((r, t) -> {
+                onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+            });
+            return unsentRequest;
+        }
+
+        public void onResponse(
+                final long currentTimeMs,
+                final OffsetFetchResponse response) {
+            Errors responseError = 
response.groupLevelError(groupState.groupId);
+            if (responseError != Errors.NONE) {
+                onFailure(currentTimeMs, responseError);
+                return;
+            }
+            onSuccess(currentTimeMs, response);
+        }
+
+        private void onFailure(final long currentTimeMs,
+                               final Errors responseError) {
+            log.debug("Offset fetch failed: {}", responseError.message());
+
+            // TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
+            if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                retry(currentTimeMs);
+            } else if (responseError == Errors.NOT_COORDINATOR) {
+                // re-discover the coordinator and retry
+                
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+                retry(currentTimeMs);
+            } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+                
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+            } else {
+                future.completeExceptionally(new KafkaException("Unexpected 
error in fetch offset response: " + responseError.message()));
+            }
+        }
+
+        private void retry(final long currentTimeMs) {
+            onFailedAttempt(currentTimeMs);
+            onSendAttempt(currentTimeMs);
+            pendingRequests.addOffsetFetchRequest(this);
+        }
+
+        private void onSuccess(final long currentTimeMs,
+                               final OffsetFetchResponse response) {
+            Set<String> unauthorizedTopics = null;
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData =
+                    response.partitionDataMap(groupState.groupId);
+            Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(responseData.size());
+            Set<TopicPartition> unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry : responseData.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+                if (partitionData.hasError()) {
+                    Errors error = partitionData.error;
+                    log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                        future.completeExceptionally(new KafkaException("Topic 
or Partition " + tp + " does " +
+                                "not " +
+                                "exist"));
+                        return;
+                    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                        if (unauthorizedTopics == null) {
+                            unauthorizedTopics = new HashSet<>();
+                        }
+                        unauthorizedTopics.add(tp.topic());
+                    } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
+                        unstableTxnOffsetTopicPartitions.add(tp);
+                    } else {
+                        future.completeExceptionally(new 
KafkaException("Unexpected error in fetch offset " +
+                                "response for partition " + tp + ": " + 
error.message()));
+                        return;
+                    }
+                } else if (partitionData.offset >= 0) {
+                    // record the position with the offset (-1 indicates no 
committed offset to fetch);
+                    // if there's no committed offset, record as null
+                    offsets.put(tp, new 
OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, 
partitionData.metadata));
+                } else {
+                    log.info("Found no committed offset for partition {}", tp);
+                    offsets.put(tp, null);
+                }
+            }
+
+            if (unauthorizedTopics != null) {
+                future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
+                // TODO: Optimization question: Do we need to retry all 
partitions upon a single partition error?
+                log.info("The following partitions still have unstable offsets 
" +
+                        "which are not cleared on the broker side: {}" +
+                        ", this could be either " +
+                        "transactional offsets waiting for completion, or " +
+                        "normal offsets waiting for replication after 
appending to local log", unstableTxnOffsetTopicPartitions);
+                retry(currentTimeMs);
+            } else {
+                future.complete(offsets);
+            }
+        }
+
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future) {
+            return this.future.whenComplete((r, t) -> {
+                if (t != null) {
+                    future.completeExceptionally(t);
+                } else {
+                    future.complete(r);
+                }
+            });
+        }
+    }
+
+    /**
+     * <p>This is used to stage the unsent requests, i.e., {@link 
UnsentOffsetCommitRequest} and {@link UnsentOffsetFetchRequest}.
+     *
+     * <p>If the request is new. It will be enqueued and will be sent upon the 
next
+     * poll.
+     *
+     * <p>If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+     * completion of the existing one.
+     *
+     * <p>There is a special handling for the {@link 
UnsentOffsetFetchRequest}. If a duplicated request was sent
+     * previously, we will chain the future to the current future.
+     */
+
+    class PendingRequests {
+        // Queue is used to ensure the sequence of commit
+        Queue<UnsentOffsetCommitRequest> unsentOffsetCommits = new 
LinkedList<>();
+        List<UnsentOffsetFetchRequest> unsentOffsetFetches = new ArrayList<>();
+        List<UnsentOffsetFetchRequest> inflightOffsetFetches = new 
ArrayList<>();
+
+        public boolean hasUnsentRequests() {
+            return !unsentOffsetCommits.isEmpty() || 
!unsentOffsetFetches.isEmpty();
+        }
+
+        public CompletableFuture<ClientResponse> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
+            UnsentOffsetCommitRequest request = new UnsentOffsetCommitRequest(
+                    offsets,
+                    groupState.groupId,
+                    groupState.groupInstanceId.orElse(null),
+                    groupState.generation);
+            unsentOffsetCommits.add(request);
+            return request.future();
+        }
+
+        /**
+         *  We want to avoid duplication when sending an {@link 
OffsetFetchRequest}. The following checks are done:
+         *  <li>1. dedupe against unsents: if a duplicated request was 
previously made, we chain the future</>
+         *  <li>2. dedupe against incompleted: If a duplicated request was 
sent but hasn't gotten a results, we chain
+         *  the future.</>
+         *
+         *  <p>If the request is new, we chain a call back to remove itself 
from the {@code inflightOffsetFetches}
+         *  upon completion.</>
+         */
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final UnsentOffsetFetchRequest request) {
+            Optional<UnsentOffsetFetchRequest> dupe =
+                    unsentOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
+            Optional<UnsentOffsetFetchRequest> inflight =
+                    inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
+
+            if (dupe.isPresent() || inflight.isPresent()) {
+                log.info("Duplicated OffsetFetchRequest: " + 
request.requestedPartitions);
+                dupe.orElseGet(() -> 
inflight.get()).chainFuture(request.future);
+            } else {
+                // remove the request from the outbound buffer: 
inflightOffsetFetches
+                request.future.whenComplete((r, t) -> {
+                    if (!inflightOffsetFetches.remove(request)) {

Review Comment:
   Just curious when do you think this could ever happen?
   
   One case I was thinking is if the consumer is shutting down, but I also feel 
that when we shutdown a consumer, we should try to stop the background thread 
first, and then clear the buffer, so that the response would not be received 
while the buffer is cleared.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -113,15 +112,45 @@ private void maybeAutoCommit(final long currentTimeMs) {
         }
 
         Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
subscriptionState.allConsumed();
-        log.debug("Auto-committing offsets {}", allConsumedOffsets);
         sendAutoCommit(allConsumedOffsets);
         autocommit.resetTimer();
+    }
+
+    /**
+     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
+     * {@link UnsentOffsetCommitRequest} and enqueue it to send later.
+     */
+    public CompletableFuture<ClientResponse> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
+        return pendingRequests.addOffsetCommitRequest(offsets);
+    }
+
+    /**
+     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}.
 It creates an
+     * {@link UnsentOffsetFetchRequest} and enqueue it to send later.
+     */
+    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final Set<TopicPartition> partitions) {
+        return pendingRequests.addOffsetFetchRequest(partitions);
+    }
+
+    public void clientPoll(final long currentTimeMs) {
+        this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs));
+    }
+
 
+    // Visible for testing
+    List<UnsentOffsetFetchRequest> unsentOffsetFetchRequests() {
+        return pendingRequests.unsentOffsetFetches;
+    }
+
+    // Visible for testing
+    Queue<UnsentOffsetCommitRequest> unsentOffsetCommitRequests() {
+        return pendingRequests.unsentOffsetCommits;
     }
 
     // Visible for testing
     CompletableFuture<ClientResponse> sendAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> allConsumedOffsets) {
-        CompletableFuture<ClientResponse> future = this.add(allConsumedOffsets)
+        log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets);
+        return this.addOffsetCommitRequest(allConsumedOffsets)

Review Comment:
   I have a question for line 159 below: it seems we only 
`setInflightCommitStatus(false)`, but never `setInflightCommitStatus(true)`. 
That means `hasInflightCommit` would always be false? Or did I miss anything?
   
   Also I feel that with the current `pending request` queue, we may not need 
the `hasInflightCommit` flag any more --- as long as there are commit requests 
in the pending queue that covers all the partitions in interest, we can omit 
sending that again, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    private class UnsentOffsetFetchRequest extends RequestState {
+        public final Set<TopicPartition> requestedPartitions;
+        public final GroupState.Generation requestedGeneration;
+        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+
+        public UnsentOffsetFetchRequest(final Set<TopicPartition> partitions,
+                                        final GroupState.Generation generation,
+                                        final long retryBackoffMs) {
+            super(retryBackoffMs);
+            this.requestedPartitions = partitions;
+            this.requestedGeneration = generation;
+            this.future = new CompletableFuture<>();
+        }
+
+        public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+        }
+
+        public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+            OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+                    groupState.groupId,
+                    true,
+                    new ArrayList<>(this.requestedPartitions),
+                    throwOnFetchStableOffsetUnsupported);
+            NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+                    builder,
+                    coordinatorRequestManager.coordinator());
+            unsentRequest.future().whenComplete((r, t) -> {
+                onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+            });
+            return unsentRequest;
+        }
+
+        public void onResponse(
+                final long currentTimeMs,
+                final OffsetFetchResponse response) {
+            Errors responseError = 
response.groupLevelError(groupState.groupId);
+            if (responseError != Errors.NONE) {
+                onFailure(currentTimeMs, responseError);
+                return;
+            }
+            onSuccess(currentTimeMs, response);
+        }
+
+        private void onFailure(final long currentTimeMs,
+                               final Errors responseError) {
+            log.debug("Offset fetch failed: {}", responseError.message());
+
+            // TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
+            if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                retry(currentTimeMs);
+            } else if (responseError == Errors.NOT_COORDINATOR) {
+                // re-discover the coordinator and retry
+                
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+                retry(currentTimeMs);
+            } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+                
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+            } else {
+                future.completeExceptionally(new KafkaException("Unexpected 
error in fetch offset response: " + responseError.message()));
+            }
+        }
+
+        private void retry(final long currentTimeMs) {
+            onFailedAttempt(currentTimeMs);
+            onSendAttempt(currentTimeMs);
+            pendingRequests.addOffsetFetchRequest(this);
+        }
+
+        private void onSuccess(final long currentTimeMs,
+                               final OffsetFetchResponse response) {
+            Set<String> unauthorizedTopics = null;
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData =
+                    response.partitionDataMap(groupState.groupId);
+            Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(responseData.size());
+            Set<TopicPartition> unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry : responseData.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+                if (partitionData.hasError()) {
+                    Errors error = partitionData.error;
+                    log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                        future.completeExceptionally(new KafkaException("Topic 
or Partition " + tp + " does " +
+                                "not " +
+                                "exist"));
+                        return;
+                    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                        if (unauthorizedTopics == null) {
+                            unauthorizedTopics = new HashSet<>();
+                        }
+                        unauthorizedTopics.add(tp.topic());
+                    } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
+                        unstableTxnOffsetTopicPartitions.add(tp);
+                    } else {
+                        future.completeExceptionally(new 
KafkaException("Unexpected error in fetch offset " +
+                                "response for partition " + tp + ": " + 
error.message()));
+                        return;
+                    }
+                } else if (partitionData.offset >= 0) {
+                    // record the position with the offset (-1 indicates no 
committed offset to fetch);
+                    // if there's no committed offset, record as null
+                    offsets.put(tp, new 
OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, 
partitionData.metadata));
+                } else {
+                    log.info("Found no committed offset for partition {}", tp);
+                    offsets.put(tp, null);
+                }
+            }
+
+            if (unauthorizedTopics != null) {
+                future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
+                // TODO: Optimization question: Do we need to retry all 
partitions upon a single partition error?
+                log.info("The following partitions still have unstable offsets 
" +
+                        "which are not cleared on the broker side: {}" +
+                        ", this could be either " +
+                        "transactional offsets waiting for completion, or " +
+                        "normal offsets waiting for replication after 
appending to local log", unstableTxnOffsetTopicPartitions);
+                retry(currentTimeMs);
+            } else {
+                future.complete(offsets);
+            }
+        }
+
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future) {
+            return this.future.whenComplete((r, t) -> {
+                if (t != null) {
+                    future.completeExceptionally(t);
+                } else {
+                    future.complete(r);
+                }
+            });
+        }
+    }
+
+    /**
+     * <p>This is used to stage the unsent requests, i.e., {@link 
UnsentOffsetCommitRequest} and {@link UnsentOffsetFetchRequest}.
+     *
+     * <p>If the request is new. It will be enqueued and will be sent upon the 
next
+     * poll.
+     *
+     * <p>If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+     * completion of the existing one.
+     *
+     * <p>There is a special handling for the {@link 
UnsentOffsetFetchRequest}. If a duplicated request was sent
+     * previously, we will chain the future to the current future.
+     */
+
+    class PendingRequests {
+        // Queue is used to ensure the sequence of commit
+        Queue<UnsentOffsetCommitRequest> unsentOffsetCommits = new 
LinkedList<>();
+        List<UnsentOffsetFetchRequest> unsentOffsetFetches = new ArrayList<>();
+        List<UnsentOffsetFetchRequest> inflightOffsetFetches = new 
ArrayList<>();
+
+        public boolean hasUnsentRequests() {
+            return !unsentOffsetCommits.isEmpty() || 
!unsentOffsetFetches.isEmpty();
+        }
+
+        public CompletableFuture<ClientResponse> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
+            UnsentOffsetCommitRequest request = new UnsentOffsetCommitRequest(
+                    offsets,
+                    groupState.groupId,
+                    groupState.groupInstanceId.orElse(null),
+                    groupState.generation);
+            unsentOffsetCommits.add(request);
+            return request.future();
+        }
+
+        /**
+         *  We want to avoid duplication when sending an {@link 
OffsetFetchRequest}. The following checks are done:
+         *  <li>1. dedupe against unsents: if a duplicated request was 
previously made, we chain the future</>
+         *  <li>2. dedupe against incompleted: If a duplicated request was 
sent but hasn't gotten a results, we chain
+         *  the future.</>
+         *
+         *  <p>If the request is new, we chain a call back to remove itself 
from the {@code inflightOffsetFetches}
+         *  upon completion.</>
+         */
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final UnsentOffsetFetchRequest request) {
+            Optional<UnsentOffsetFetchRequest> dupe =
+                    unsentOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
+            Optional<UnsentOffsetFetchRequest> inflight =
+                    inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
+
+            if (dupe.isPresent() || inflight.isPresent()) {
+                log.info("Duplicated OffsetFetchRequest: " + 
request.requestedPartitions);
+                dupe.orElseGet(() -> 
inflight.get()).chainFuture(request.future);
+            } else {
+                // remove the request from the outbound buffer: 
inflightOffsetFetches
+                request.future.whenComplete((r, t) -> {
+                    if (!inflightOffsetFetches.remove(request)) {
+                        log.info("unable to remove request from the outbound 
buffer:" + request);
+                    }
+                });
+                this.unsentOffsetFetches.add(request);
+            }
+            return request.future;
+        }
+
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final Set<TopicPartition> partitions) {
+            UnsentOffsetFetchRequest request = new UnsentOffsetFetchRequest(
+                    partitions,
+                    groupState.generation,
+                    retryBackoffMs);
+            return addOffsetFetchRequest(request);
+        }
+
+        public List<NetworkClientDelegate.UnsentRequest> drain(final long 
currentTimeMs) {
+            List<NetworkClientDelegate.UnsentRequest> unsent = new 
ArrayList<>();
+            unsent.addAll(unsentOffsetCommits.stream()
+                    .map(UnsentOffsetCommitRequest::toUnsentRequest)
+                    .collect(Collectors.toList()));
+            List<UnsentOffsetFetchRequest> sendables = 
unsentOffsetFetches.stream()
+                    .filter(r -> r.canSendRequest(currentTimeMs))
+                    .collect(Collectors.toList());
+            inflightOffsetFetches.addAll(sendables);
+            unsent.addAll(sendables.stream()
+                    .peek(r -> r.onSendAttempt(currentTimeMs))
+                    .map(r -> r.toUnsentRequest(currentTimeMs))
+                    .collect(Collectors.toList()));
+            // empty the staged requests

Review Comment:
   This comment seems redundant and can be removed, as it is pretty clear from 
the code. Also the term `staged` maybe confusing as we are now in `unsent` 
vocabulary.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -113,15 +112,45 @@ private void maybeAutoCommit(final long currentTimeMs) {
         }
 
         Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
subscriptionState.allConsumed();
-        log.debug("Auto-committing offsets {}", allConsumedOffsets);
         sendAutoCommit(allConsumedOffsets);
         autocommit.resetTimer();
+    }
+
+    /**
+     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
+     * {@link UnsentOffsetCommitRequest} and enqueue it to send later.
+     */
+    public CompletableFuture<ClientResponse> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
+        return pendingRequests.addOffsetCommitRequest(offsets);
+    }
+
+    /**
+     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}.
 It creates an
+     * {@link UnsentOffsetFetchRequest} and enqueue it to send later.
+     */
+    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final Set<TopicPartition> partitions) {
+        return pendingRequests.addOffsetFetchRequest(partitions);
+    }
+
+    public void clientPoll(final long currentTimeMs) {

Review Comment:
   The func name `clientPoll` and `poll` are a bit less distinguishable. It 
seems the former function would only be used to update the autoCommit state of 
the commitManager, right? If we do not expect this function to be extended for 
other logic, could we just rename it to `updateAutoCommitTimer`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    private class UnsentOffsetFetchRequest extends RequestState {
+        public final Set<TopicPartition> requestedPartitions;
+        public final GroupState.Generation requestedGeneration;
+        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+
+        public UnsentOffsetFetchRequest(final Set<TopicPartition> partitions,
+                                        final GroupState.Generation generation,
+                                        final long retryBackoffMs) {
+            super(retryBackoffMs);
+            this.requestedPartitions = partitions;
+            this.requestedGeneration = generation;
+            this.future = new CompletableFuture<>();
+        }
+
+        public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+        }
+
+        public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+            OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+                    groupState.groupId,
+                    true,
+                    new ArrayList<>(this.requestedPartitions),
+                    throwOnFetchStableOffsetUnsupported);
+            NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+                    builder,
+                    coordinatorRequestManager.coordinator());
+            unsentRequest.future().whenComplete((r, t) -> {
+                onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+            });
+            return unsentRequest;
+        }
+
+        public void onResponse(
+                final long currentTimeMs,
+                final OffsetFetchResponse response) {
+            Errors responseError = 
response.groupLevelError(groupState.groupId);
+            if (responseError != Errors.NONE) {
+                onFailure(currentTimeMs, responseError);
+                return;
+            }
+            onSuccess(currentTimeMs, response);
+        }
+
+        private void onFailure(final long currentTimeMs,
+                               final Errors responseError) {
+            log.debug("Offset fetch failed: {}", responseError.message());
+
+            // TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
+            if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                retry(currentTimeMs);
+            } else if (responseError == Errors.NOT_COORDINATOR) {
+                // re-discover the coordinator and retry
+                
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+                retry(currentTimeMs);
+            } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+                
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+            } else {
+                future.completeExceptionally(new KafkaException("Unexpected 
error in fetch offset response: " + responseError.message()));
+            }
+        }
+
+        private void retry(final long currentTimeMs) {
+            onFailedAttempt(currentTimeMs);
+            onSendAttempt(currentTimeMs);
+            pendingRequests.addOffsetFetchRequest(this);
+        }
+
+        private void onSuccess(final long currentTimeMs,
+                               final OffsetFetchResponse response) {
+            Set<String> unauthorizedTopics = null;
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData =
+                    response.partitionDataMap(groupState.groupId);
+            Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(responseData.size());
+            Set<TopicPartition> unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry : responseData.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+                if (partitionData.hasError()) {
+                    Errors error = partitionData.error;
+                    log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                        future.completeExceptionally(new KafkaException("Topic 
or Partition " + tp + " does " +
+                                "not " +
+                                "exist"));
+                        return;
+                    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                        if (unauthorizedTopics == null) {
+                            unauthorizedTopics = new HashSet<>();
+                        }
+                        unauthorizedTopics.add(tp.topic());
+                    } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
+                        unstableTxnOffsetTopicPartitions.add(tp);
+                    } else {
+                        future.completeExceptionally(new 
KafkaException("Unexpected error in fetch offset " +
+                                "response for partition " + tp + ": " + 
error.message()));
+                        return;
+                    }
+                } else if (partitionData.offset >= 0) {
+                    // record the position with the offset (-1 indicates no 
committed offset to fetch);
+                    // if there's no committed offset, record as null
+                    offsets.put(tp, new 
OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, 
partitionData.metadata));
+                } else {
+                    log.info("Found no committed offset for partition {}", tp);
+                    offsets.put(tp, null);
+                }
+            }
+
+            if (unauthorizedTopics != null) {
+                future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
+                // TODO: Optimization question: Do we need to retry all 
partitions upon a single partition error?
+                log.info("The following partitions still have unstable offsets 
" +
+                        "which are not cleared on the broker side: {}" +
+                        ", this could be either " +
+                        "transactional offsets waiting for completion, or " +
+                        "normal offsets waiting for replication after 
appending to local log", unstableTxnOffsetTopicPartitions);
+                retry(currentTimeMs);
+            } else {
+                future.complete(offsets);
+            }
+        }
+
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future) {
+            return this.future.whenComplete((r, t) -> {
+                if (t != null) {
+                    future.completeExceptionally(t);
+                } else {
+                    future.complete(r);
+                }
+            });
+        }
+    }
+
+    /**
+     * <p>This is used to stage the unsent requests, i.e., {@link 
UnsentOffsetCommitRequest} and {@link UnsentOffsetFetchRequest}.
+     *
+     * <p>If the request is new. It will be enqueued and will be sent upon the 
next
+     * poll.
+     *
+     * <p>If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+     * completion of the existing one.
+     *
+     * <p>There is a special handling for the {@link 
UnsentOffsetFetchRequest}. If a duplicated request was sent
+     * previously, we will chain the future to the current future.
+     */
+
+    class PendingRequests {
+        // Queue is used to ensure the sequence of commit
+        Queue<UnsentOffsetCommitRequest> unsentOffsetCommits = new 
LinkedList<>();
+        List<UnsentOffsetFetchRequest> unsentOffsetFetches = new ArrayList<>();
+        List<UnsentOffsetFetchRequest> inflightOffsetFetches = new 
ArrayList<>();
+
+        public boolean hasUnsentRequests() {
+            return !unsentOffsetCommits.isEmpty() || 
!unsentOffsetFetches.isEmpty();
+        }
+
+        public CompletableFuture<ClientResponse> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
+            UnsentOffsetCommitRequest request = new UnsentOffsetCommitRequest(
+                    offsets,
+                    groupState.groupId,
+                    groupState.groupInstanceId.orElse(null),
+                    groupState.generation);
+            unsentOffsetCommits.add(request);
+            return request.future();
+        }
+
+        /**
+         *  We want to avoid duplication when sending an {@link 
OffsetFetchRequest}. The following checks are done:
+         *  <li>1. dedupe against unsents: if a duplicated request was 
previously made, we chain the future</>
+         *  <li>2. dedupe against incompleted: If a duplicated request was 
sent but hasn't gotten a results, we chain
+         *  the future.</>
+         *
+         *  <p>If the request is new, we chain a call back to remove itself 
from the {@code inflightOffsetFetches}
+         *  upon completion.</>
+         */
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final UnsentOffsetFetchRequest request) {
+            Optional<UnsentOffsetFetchRequest> dupe =
+                    unsentOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
+            Optional<UnsentOffsetFetchRequest> inflight =
+                    inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
+
+            if (dupe.isPresent() || inflight.isPresent()) {
+                log.info("Duplicated OffsetFetchRequest: " + 
request.requestedPartitions);
+                dupe.orElseGet(() -> 
inflight.get()).chainFuture(request.future);
+            } else {
+                // remove the request from the outbound buffer: 
inflightOffsetFetches
+                request.future.whenComplete((r, t) -> {
+                    if (!inflightOffsetFetches.remove(request)) {
+                        log.info("unable to remove request from the outbound 
buffer:" + request);
+                    }
+                });
+                this.unsentOffsetFetches.add(request);
+            }
+            return request.future;
+        }
+
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final Set<TopicPartition> partitions) {
+            UnsentOffsetFetchRequest request = new UnsentOffsetFetchRequest(
+                    partitions,
+                    groupState.generation,
+                    retryBackoffMs);
+            return addOffsetFetchRequest(request);
+        }
+
+        public List<NetworkClientDelegate.UnsentRequest> drain(final long 
currentTimeMs) {
+            List<NetworkClientDelegate.UnsentRequest> unsent = new 
ArrayList<>();
+            unsent.addAll(unsentOffsetCommits.stream()
+                    .map(UnsentOffsetCommitRequest::toUnsentRequest)
+                    .collect(Collectors.toList()));
+            List<UnsentOffsetFetchRequest> sendables = 
unsentOffsetFetches.stream()
+                    .filter(r -> r.canSendRequest(currentTimeMs))
+                    .collect(Collectors.toList());
+            inflightOffsetFetches.addAll(sendables);
+            unsent.addAll(sendables.stream()
+                    .peek(r -> r.onSendAttempt(currentTimeMs))
+                    .map(r -> r.toUnsentRequest(currentTimeMs))
+                    .collect(Collectors.toList()));
+            // empty the staged requests
+            unsentOffsetCommits.clear();

Review Comment:
   This seems not right to me as well: if an unsent request is filtered in 
`.filter(r -> r.canSendRequest(currentTimeMs))` they should still be kept in 
the queue, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -137,31 +166,21 @@ CompletableFuture<ClientResponse> sendAutoCommit(final 
Map<TopicPartition, Offse
                     }
                     return null;
                 });
-        return future;
-    }
-
-    public void clientPoll(final long currentTimeMs) {
-        this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs));
     }
 
-    // Visible for testing
-    Queue<StagedCommit> stagedCommits() {
-        return this.stagedCommits;
-    }
 
-    private class StagedCommit {
+    private class UnsentOffsetCommitRequest {
         private final Map<TopicPartition, OffsetAndMetadata> offsets;
         private final String groupId;
         private final GroupState.Generation generation;
         private final String groupInstanceId;
         private final NetworkClientDelegate.FutureCompletionHandler future;
 
-        public StagedCommit(final Map<TopicPartition, OffsetAndMetadata> 
offsets,
-                            final String groupId,
-                            final String groupInstanceId,
-                            final GroupState.Generation generation) {
+        public UnsentOffsetCommitRequest(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
+                                         final String groupId,
+                                         final String groupInstanceId,
+                                         final GroupState.Generation 
generation) {
             this.offsets = offsets;
-            // if no callback is provided, DefaultOffsetCommitCallback will be 
used.

Review Comment:
   I'm wondering in the new code, did we still trigger 
`DefaultOffsetCommitCallback` if user does not specify a callback? Currently it 
seems we always just use a plain `FutureCompletionHandler.future` without 
registering any callback. Is that a behavioral change?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +223,209 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    private class UnsentOffsetFetchRequest extends RequestState {
+        public final Set<TopicPartition> requestedPartitions;
+        public final GroupState.Generation requestedGeneration;
+        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+
+        public UnsentOffsetFetchRequest(final Set<TopicPartition> partitions,
+                                        final GroupState.Generation generation,
+                                        final long retryBackoffMs) {
+            super(retryBackoffMs);
+            this.requestedPartitions = partitions;
+            this.requestedGeneration = generation;
+            this.future = new CompletableFuture<>();
+        }
+
+        public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+        }
+
+        public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+            OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+                    groupState.groupId,
+                    true,
+                    new ArrayList<>(this.requestedPartitions),
+                    throwOnFetchStableOffsetUnsupported);
+            NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+                    builder,
+                    coordinatorRequestManager.coordinator());
+            unsentRequest.future().whenComplete((r, t) -> {
+                onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+            });
+            return unsentRequest;
+        }
+
+        public void onResponse(

Review Comment:
   Not sure I understand this comment?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) {
             this.timer.update(currentTimeMs);
         }
     }
+
+    private class FetchCommittedOffsetResponseHandler {

Review Comment:
   Ack, let's keep the current behavior for now.
   
   Maybe we can add a TODO here to consider retry for that error in the future 
as a behavioral change?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    private class UnsentOffsetFetchRequest extends RequestState {
+        public final Set<TopicPartition> requestedPartitions;
+        public final GroupState.Generation requestedGeneration;
+        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+
+        public UnsentOffsetFetchRequest(final Set<TopicPartition> partitions,
+                                        final GroupState.Generation generation,
+                                        final long retryBackoffMs) {
+            super(retryBackoffMs);
+            this.requestedPartitions = partitions;
+            this.requestedGeneration = generation;
+            this.future = new CompletableFuture<>();
+        }
+
+        public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+        }
+
+        public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+            OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+                    groupState.groupId,
+                    true,
+                    new ArrayList<>(this.requestedPartitions),
+                    throwOnFetchStableOffsetUnsupported);
+            NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+                    builder,
+                    coordinatorRequestManager.coordinator());
+            unsentRequest.future().whenComplete((r, t) -> {
+                onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+            });
+            return unsentRequest;
+        }
+
+        public void onResponse(
+                final long currentTimeMs,
+                final OffsetFetchResponse response) {
+            Errors responseError = 
response.groupLevelError(groupState.groupId);
+            if (responseError != Errors.NONE) {
+                onFailure(currentTimeMs, responseError);
+                return;
+            }
+            onSuccess(currentTimeMs, response);
+        }
+
+        private void onFailure(final long currentTimeMs,
+                               final Errors responseError) {
+            log.debug("Offset fetch failed: {}", responseError.message());
+
+            // TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
+            if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                retry(currentTimeMs);
+            } else if (responseError == Errors.NOT_COORDINATOR) {
+                // re-discover the coordinator and retry
+                
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+                retry(currentTimeMs);
+            } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+                
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+            } else {
+                future.completeExceptionally(new KafkaException("Unexpected 
error in fetch offset response: " + responseError.message()));
+            }
+        }
+
+        private void retry(final long currentTimeMs) {
+            onFailedAttempt(currentTimeMs);
+            onSendAttempt(currentTimeMs);
+            pendingRequests.addOffsetFetchRequest(this);
+        }
+
+        private void onSuccess(final long currentTimeMs,
+                               final OffsetFetchResponse response) {
+            Set<String> unauthorizedTopics = null;
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData =
+                    response.partitionDataMap(groupState.groupId);
+            Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(responseData.size());
+            Set<TopicPartition> unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry : responseData.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+                if (partitionData.hasError()) {
+                    Errors error = partitionData.error;
+                    log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                        future.completeExceptionally(new KafkaException("Topic 
or Partition " + tp + " does " +
+                                "not " +
+                                "exist"));
+                        return;
+                    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                        if (unauthorizedTopics == null) {
+                            unauthorizedTopics = new HashSet<>();
+                        }
+                        unauthorizedTopics.add(tp.topic());
+                    } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
+                        unstableTxnOffsetTopicPartitions.add(tp);
+                    } else {
+                        future.completeExceptionally(new 
KafkaException("Unexpected error in fetch offset " +
+                                "response for partition " + tp + ": " + 
error.message()));
+                        return;
+                    }
+                } else if (partitionData.offset >= 0) {
+                    // record the position with the offset (-1 indicates no 
committed offset to fetch);
+                    // if there's no committed offset, record as null
+                    offsets.put(tp, new 
OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, 
partitionData.metadata));
+                } else {
+                    log.info("Found no committed offset for partition {}", tp);
+                    offsets.put(tp, null);
+                }
+            }
+
+            if (unauthorizedTopics != null) {
+                future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
+                // TODO: Optimization question: Do we need to retry all 
partitions upon a single partition error?
+                log.info("The following partitions still have unstable offsets 
" +
+                        "which are not cleared on the broker side: {}" +
+                        ", this could be either " +
+                        "transactional offsets waiting for completion, or " +
+                        "normal offsets waiting for replication after 
appending to local log", unstableTxnOffsetTopicPartitions);
+                retry(currentTimeMs);
+            } else {
+                future.complete(offsets);
+            }
+        }
+
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future) {
+            return this.future.whenComplete((r, t) -> {
+                if (t != null) {
+                    future.completeExceptionally(t);
+                } else {
+                    future.complete(r);
+                }
+            });
+        }
+    }
+
+    /**
+     * <p>This is used to stage the unsent requests, i.e., {@link 
UnsentOffsetCommitRequest} and {@link UnsentOffsetFetchRequest}.
+     *
+     * <p>If the request is new. It will be enqueued and will be sent upon the 
next
+     * poll.
+     *
+     * <p>If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+     * completion of the existing one.
+     *
+     * <p>There is a special handling for the {@link 
UnsentOffsetFetchRequest}. If a duplicated request was sent
+     * previously, we will chain the future to the current future.
+     */
+
+    class PendingRequests {
+        // Queue is used to ensure the sequence of commit
+        Queue<UnsentOffsetCommitRequest> unsentOffsetCommits = new 
LinkedList<>();
+        List<UnsentOffsetFetchRequest> unsentOffsetFetches = new ArrayList<>();
+        List<UnsentOffsetFetchRequest> inflightOffsetFetches = new 
ArrayList<>();
+
+        public boolean hasUnsentRequests() {
+            return !unsentOffsetCommits.isEmpty() || 
!unsentOffsetFetches.isEmpty();
+        }
+
+        public CompletableFuture<ClientResponse> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
+            UnsentOffsetCommitRequest request = new UnsentOffsetCommitRequest(
+                    offsets,
+                    groupState.groupId,
+                    groupState.groupInstanceId.orElse(null),
+                    groupState.generation);
+            unsentOffsetCommits.add(request);
+            return request.future();
+        }
+
+        /**
+         *  We want to avoid duplication when sending an {@link 
OffsetFetchRequest}. The following checks are done:
+         *  <li>1. dedupe against unsents: if a duplicated request was 
previously made, we chain the future</>
+         *  <li>2. dedupe against incompleted: If a duplicated request was 
sent but hasn't gotten a results, we chain
+         *  the future.</>
+         *
+         *  <p>If the request is new, we chain a call back to remove itself 
from the {@code inflightOffsetFetches}
+         *  upon completion.</>
+         */
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final UnsentOffsetFetchRequest request) {
+            Optional<UnsentOffsetFetchRequest> dupe =
+                    unsentOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
+            Optional<UnsentOffsetFetchRequest> inflight =
+                    inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
+
+            if (dupe.isPresent() || inflight.isPresent()) {
+                log.info("Duplicated OffsetFetchRequest: " + 
request.requestedPartitions);
+                dupe.orElseGet(() -> 
inflight.get()).chainFuture(request.future);
+            } else {
+                // remove the request from the outbound buffer: 
inflightOffsetFetches
+                request.future.whenComplete((r, t) -> {
+                    if (!inflightOffsetFetches.remove(request)) {
+                        log.info("unable to remove request from the outbound 
buffer:" + request);
+                    }
+                });
+                this.unsentOffsetFetches.add(request);
+            }
+            return request.future;
+        }
+
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final Set<TopicPartition> partitions) {
+            UnsentOffsetFetchRequest request = new UnsentOffsetFetchRequest(
+                    partitions,
+                    groupState.generation,
+                    retryBackoffMs);
+            return addOffsetFetchRequest(request);
+        }
+
+        public List<NetworkClientDelegate.UnsentRequest> drain(final long 
currentTimeMs) {
+            List<NetworkClientDelegate.UnsentRequest> unsent = new 
ArrayList<>();
+            unsent.addAll(unsentOffsetCommits.stream()
+                    .map(UnsentOffsetCommitRequest::toUnsentRequest)
+                    .collect(Collectors.toList()));
+            List<UnsentOffsetFetchRequest> sendables = 
unsentOffsetFetches.stream()
+                    .filter(r -> r.canSendRequest(currentTimeMs))
+                    .collect(Collectors.toList());
+            inflightOffsetFetches.addAll(sendables);

Review Comment:
   This seems not right to me: if we add the inflight request to sendables 
again and return in `drain`, then they will be re-sent by the network client 
delegate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to