apoorvmittal10 commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1933528377


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java:
##########
@@ -35,15 +36,19 @@ public class Acknowledgements {
     // The acknowledgements keyed by offset. If the record is a gap, the 
AcknowledgeType will be null.
     private final Map<Long, AcknowledgeType> acknowledgements;
 
-    // When the broker responds to the acknowledgements, this is the error 
code returned.
-    private Errors acknowledgeErrorCode;
+    // When the broker responds to the acknowledgements, this is the exception 
thrown.
+    private KafkaException acknowledgeException;
+
+    private boolean completed;

Review Comment:
   Should we have comments on variable as like other in the class?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * This class combines Acknowledgements with the id of the node to use for 
acknowledging.
+ */
+public class NodeAcknowledgements {
+    private final int nodeId;
+    private final Acknowledgements acknowledgements;
+
+    public NodeAcknowledgements(int nodeId, Acknowledgements acknowledgements) 
{
+        this.nodeId = nodeId;
+        this.acknowledgements = acknowledgements;
+    }
+
+    public int getNodeId() {

Review Comment:
   ```suggestion
       public int nodeId() {
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * This class combines Acknowledgements with the id of the node to use for 
acknowledging.
+ */
+public class NodeAcknowledgements {

Review Comment:
   nit: Seems this can be a `record` class itself.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java:
##########
@@ -301,10 +307,12 @@ private boolean 
canOptimiseForSingleAcknowledgeType(AcknowledgementBatch acknowl
     public String toString() {
         StringBuilder sb = new StringBuilder("Acknowledgements(");
         sb.append(acknowledgements);
-        if (acknowledgeErrorCode != null) {
-            sb.append(", errorCode=");
-            sb.append(acknowledgeErrorCode.code());
+        if (acknowledgeException != null) {
+            sb.append(", acknowledgeException=");
+            sb.append(Errors.forException(acknowledgeException));
         }

Review Comment:
   Should it be without if check, so explicitly errorCode can be logged?
   
   ```
   sb.append(", errorCode=")
   sb.append(acknowledgeException != null : 
Errors.forException(acknowledgeException) ? "null")



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java:
##########
@@ -115,25 +120,26 @@ public int size() {
      * @return Whether the acknowledgements were sent to the broker and a 
response received
      */
     public boolean isCompleted() {
-        return acknowledgeErrorCode != null;
+        return completed;
     }
 
     /**
-     * Set the acknowledgement error code when the response has been received 
from the broker.
+     * Set the acknowledgement exception when the response has been received 
from the broker.
      *
-     * @param acknowledgeErrorCode the error code
+     * @param acknowledgeException the exception (will be null if successful)
      */
-    public void setAcknowledgeErrorCode(Errors acknowledgeErrorCode) {
-        this.acknowledgeErrorCode = acknowledgeErrorCode;
+    public void setAcknowledgeException(KafkaException acknowledgeException) {
+        completed = true;
+        this.acknowledgeException = acknowledgeException;
     }
 
     /**
-     * Get the acknowledgement error code when the response has been received 
from the broker.
+     * Get the acknowledgement exception when the response has been received 
from the broker.
      *
      * @return the error code
      */
-    public Errors getAcknowledgeErrorCode() {
-        return acknowledgeErrorCode;
+    public KafkaException getAcknowledgeException() {

Review Comment:
   We generally avoid `get` prefix in methods so should it be as like below?
   ```suggestion
       public KafkaException acknowledgeException() {
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * This class combines Acknowledgements with the id of the node to use for 
acknowledging.
+ */
+public class NodeAcknowledgements {
+    private final int nodeId;
+    private final Acknowledgements acknowledgements;
+
+    public NodeAcknowledgements(int nodeId, Acknowledgements acknowledgements) 
{
+        this.nodeId = nodeId;
+        this.acknowledgements = acknowledgements;
+    }
+
+    public int getNodeId() {
+        return nodeId;
+    }
+
+    public Acknowledgements getAcknowledgements() {

Review Comment:
   ```suggestion
       public Acknowledgements acknowledgements() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to