AndrewJSchofield commented on code in PR #17136: URL: https://github.com/apache/kafka/pull/17136#discussion_r1754436434
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ########## @@ -84,10 +86,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi private Uuid memberId; private boolean fetchMoreRecords = false; private final Map<TopicIdPartition, Acknowledgements> fetchAcknowledgementsMap; - private final Map<Integer, Pair<AcknowledgeRequestState>> acknowledgeRequestStates; + private final Map<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStates; private final long retryBackoffMs; private final long retryBackoffMaxMs; - private boolean closing = false; private final CompletableFuture<Void> closeFuture; Review Comment: It seems to me that the handling of `closeFuture` is flawed. An `AcknowledgeRequestState` is node-specific, but the future is associated with the entire request manager. It's only valid to complete the future once all close acks have been completed, not just the first one. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ########## @@ -1011,33 +1036,47 @@ public void completeIfEmpty() { } } - static class Pair<V> { + static class Tuple<V> { private V asyncRequest; - private V syncRequest; + private Queue<V> syncRequestQueue; + private V closeRequest; - public Pair(V asyncRequest, V syncRequest) { + public Tuple(V asyncRequest, Queue<V> syncRequestQueue, V closeRequest) { this.asyncRequest = asyncRequest; - this.syncRequest = syncRequest; + this.syncRequestQueue = syncRequestQueue; + this.closeRequest = closeRequest; } public void setAsyncRequest(V asyncRequest) { this.asyncRequest = asyncRequest; } - public void setSyncRequest(V second) { - this.syncRequest = second; + public void setSyncRequestQueue(Queue<V> syncRequestQueue) { Review Comment: You could encapsulate this queue entirely in this class. For example, `addSyncRequest` could create the queue if it's not yet been created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org