AndrewJSchofield commented on code in PR #17136:
URL: https://github.com/apache/kafka/pull/17136#discussion_r1754436434


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -84,10 +86,9 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     private Uuid memberId;
     private boolean fetchMoreRecords = false;
     private final Map<TopicIdPartition, Acknowledgements> 
fetchAcknowledgementsMap;
-    private final Map<Integer, Pair<AcknowledgeRequestState>> 
acknowledgeRequestStates;
+    private final Map<Integer, Tuple<AcknowledgeRequestState>> 
acknowledgeRequestStates;
     private final long retryBackoffMs;
     private final long retryBackoffMaxMs;
-    private boolean closing = false;
     private final CompletableFuture<Void> closeFuture;

Review Comment:
   It seems to me that the handling of `closeFuture` is flawed. An 
`AcknowledgeRequestState` is node-specific, but the future is associated with 
the entire request manager. It's only valid to complete the future once all 
close acks have been completed, not just the first one.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -1011,33 +1036,47 @@ public void completeIfEmpty() {
         }
     }
 
-    static class Pair<V> {
+    static class Tuple<V> {
         private V asyncRequest;
-        private V syncRequest;
+        private Queue<V> syncRequestQueue;
+        private V closeRequest;
 
-        public Pair(V asyncRequest, V syncRequest) {
+        public Tuple(V asyncRequest, Queue<V> syncRequestQueue, V 
closeRequest) {
             this.asyncRequest = asyncRequest;
-            this.syncRequest = syncRequest;
+            this.syncRequestQueue = syncRequestQueue;
+            this.closeRequest = closeRequest;
         }
 
         public void setAsyncRequest(V asyncRequest) {
             this.asyncRequest = asyncRequest;
         }
 
-        public void setSyncRequest(V second) {
-            this.syncRequest = second;
+        public void setSyncRequestQueue(Queue<V> syncRequestQueue) {

Review Comment:
   You could encapsulate this queue entirely in this class. For example, 
`addSyncRequest` could create the queue if it's not yet been created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to