guozhangwang commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1136315295


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    static class UnsentOffsetFetchRequestState extends RequestState {
+        public final Set<TopicPartition> requestedPartitions;
+        public final GroupState.Generation requestedGeneration;
+        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+
+        public UnsentOffsetFetchRequestState(final Set<TopicPartition> 
partitions,
+                                             final GroupState.Generation 
generation,
+                                             final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future,
+                                             final long retryBackoffMs) {
+            super(retryBackoffMs);
+            this.requestedPartitions = partitions;
+            this.requestedGeneration = generation;
+            this.future = future;
+        }
+
+        public boolean sameRequest(final UnsentOffsetFetchRequestState 
request) {
+            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+        }
+    }
+
+    /**
+     * <p>This is used to support the committed() API. Here we use a Java 
Collections, {@code unsentRequests}, to
+     * track
+     * the OffsetFetchRequests that haven't been sent, to prevent sending the 
same requests in the same batch.
+     *
+     * <p>If the request is new. It will be enqueued to the {@code 
unsentRequest}, and will be sent upon the next
+     * poll.
+     *
+     * <p>If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+     * completion of the existing one.
+     *
+     * TODO: There is an optimization to present duplication to the sent but 
incompleted requests. I'm not sure if we
+     * need that.
+     */
+    class UnsentOffsetFetchRequests {

Review Comment:
   Just to think a bit further here, it makes me thinking that inside the 
`unsentOffsetFetchRequests` we'd need to keep two collections: 1) the unsent 
requests, 2) the sent-but-not-responded requests, and upon getting a new event 
from the queue, we would check against both collections. And then we we drain 
the first collection and write them to the network client, we move them to the 
second collection, and only drop requests from the second collection after a 
response is received and the registered handlers (it's possible to have 
multiple events' handlers registered for the same request).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to