philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1136434874
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } + static class UnsentOffsetFetchRequestState extends RequestState { + public final Set<TopicPartition> requestedPartitions; + public final GroupState.Generation requestedGeneration; + public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future; + + public UnsentOffsetFetchRequestState(final Set<TopicPartition> partitions, + final GroupState.Generation generation, + final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future, + final long retryBackoffMs) { + super(retryBackoffMs); + this.requestedPartitions = partitions; + this.requestedGeneration = generation; + this.future = future; + } + + public boolean sameRequest(final UnsentOffsetFetchRequestState request) { + return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); + } + } + + /** + * <p>This is used to support the committed() API. Here we use a Java Collections, {@code unsentRequests}, to + * track + * the OffsetFetchRequests that haven't been sent, to prevent sending the same requests in the same batch. + * + * <p>If the request is new. It will be enqueued to the {@code unsentRequest}, and will be sent upon the next + * poll. + * + * <p>If the same request has been sent, the request's {@code CompletableFuture} will be completed upon the + * completion of the existing one. + * + * TODO: There is an optimization to present duplication to the sent but incompleted requests. I'm not sure if we + * need that. + */ + class UnsentOffsetFetchRequests { Review Comment: Hi! That was my original attempt :) , but I abandon it because I was wondering if we actually need this optimization? I think if we call committed() several times consecutively within a single event loop, it makes sense to coalesce them into a single request. Once we send out the request, I wonder how often do we invoke another committed() before the request gets sent out. Is it a use case from the stream side? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org