apoorvmittal10 commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1933528377
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java: ########## @@ -35,15 +36,19 @@ public class Acknowledgements { // The acknowledgements keyed by offset. If the record is a gap, the AcknowledgeType will be null. private final Map<Long, AcknowledgeType> acknowledgements; - // When the broker responds to the acknowledgements, this is the error code returned. - private Errors acknowledgeErrorCode; + // When the broker responds to the acknowledgements, this is the exception thrown. + private KafkaException acknowledgeException; + + private boolean completed; Review Comment: Should we have comments on variable as like other in the class? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +/** + * This class combines Acknowledgements with the id of the node to use for acknowledging. + */ +public class NodeAcknowledgements { + private final int nodeId; + private final Acknowledgements acknowledgements; + + public NodeAcknowledgements(int nodeId, Acknowledgements acknowledgements) { + this.nodeId = nodeId; + this.acknowledgements = acknowledgements; + } + + public int getNodeId() { Review Comment: ```suggestion public int nodeId() { ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +/** + * This class combines Acknowledgements with the id of the node to use for acknowledging. + */ +public class NodeAcknowledgements { Review Comment: nit: Seems this can be a `record` class itself. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java: ########## @@ -301,10 +307,12 @@ private boolean canOptimiseForSingleAcknowledgeType(AcknowledgementBatch acknowl public String toString() { StringBuilder sb = new StringBuilder("Acknowledgements("); sb.append(acknowledgements); - if (acknowledgeErrorCode != null) { - sb.append(", errorCode="); - sb.append(acknowledgeErrorCode.code()); + if (acknowledgeException != null) { + sb.append(", acknowledgeException="); + sb.append(Errors.forException(acknowledgeException)); } Review Comment: Should it be without if check, so explicitly errorCode can be logged? ``` sb.append(", errorCode=") sb.append(acknowledgeException != null : Errors.forException(acknowledgeException) ? "null") ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java: ########## @@ -115,25 +120,26 @@ public int size() { * @return Whether the acknowledgements were sent to the broker and a response received */ public boolean isCompleted() { - return acknowledgeErrorCode != null; + return completed; } /** - * Set the acknowledgement error code when the response has been received from the broker. + * Set the acknowledgement exception when the response has been received from the broker. * - * @param acknowledgeErrorCode the error code + * @param acknowledgeException the exception (will be null if successful) */ - public void setAcknowledgeErrorCode(Errors acknowledgeErrorCode) { - this.acknowledgeErrorCode = acknowledgeErrorCode; + public void setAcknowledgeException(KafkaException acknowledgeException) { + completed = true; + this.acknowledgeException = acknowledgeException; } /** - * Get the acknowledgement error code when the response has been received from the broker. + * Get the acknowledgement exception when the response has been received from the broker. * * @return the error code */ - public Errors getAcknowledgeErrorCode() { - return acknowledgeErrorCode; + public KafkaException getAcknowledgeException() { Review Comment: We generally avoid `get` prefix in methods so should it be as like below? ```suggestion public KafkaException acknowledgeException() { ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +/** + * This class combines Acknowledgements with the id of the node to use for acknowledging. + */ +public class NodeAcknowledgements { + private final int nodeId; + private final Acknowledgements acknowledgements; + + public NodeAcknowledgements(int nodeId, Acknowledgements acknowledgements) { + this.nodeId = nodeId; + this.acknowledgements = acknowledgements; + } + + public int getNodeId() { + return nodeId; + } + + public Acknowledgements getAcknowledgements() { Review Comment: ```suggestion public Acknowledgements acknowledgements() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org