rreddy-22 commented on code in PR #19470: URL: https://github.com/apache/kafka/pull/19470#discussion_r2060972911
########## clients/src/main/java/org/apache/kafka/clients/producer/PreparedTxnState.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import org.apache.kafka.common.record.RecordBatch; + +/** + * Class containing the state of a transaction after it has been prepared for a two-phase commit. + * This state includes the producer ID and epoch, which are needed to commit or abort the transaction. + */ +public class PreparedTxnState { + private final long producerId; + private final short epoch; + + /** + * Creates a new empty PreparedTxnState + */ + public PreparedTxnState() { + this.producerId = RecordBatch.NO_PRODUCER_ID; + this.epoch = RecordBatch.NO_PRODUCER_EPOCH; + } + + /** + * Creates a new PreparedTxnState from a serialized string representation + * + * @param serializedState The serialized string to deserialize. + * @throws IllegalArgumentException if the serialized string is not in the expected format + */ + public PreparedTxnState(String serializedState) { + if (serializedState == null || serializedState.isEmpty()) { + this.producerId = RecordBatch.NO_PRODUCER_ID; + this.epoch = RecordBatch.NO_PRODUCER_EPOCH; + return; + } + + try { + String[] parts = serializedState.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid serialized transaction state format: " + serializedState); + } + + this.producerId = Long.parseLong(parts[0]); + this.epoch = Short.parseShort(parts[1]); + + // Validate the producerId and epoch values. + if (!((this.producerId >= 0 && this.epoch >= 0) || (this.producerId == -1 && this.epoch == -1))) { + throw new IllegalArgumentException("Invalid producer ID and epoch values: " + + producerId + ":" + epoch + ". Both must be either >= 0 or both -1"); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid serialized transaction state format: " + serializedState, e); + } + } + + /** + * Creates a new PreparedTxnState with the given producer ID and epoch + * + * @param producerId The producer ID + * @param epoch The producer epoch + * + * package-private for testing Review Comment: yess I just meant the access modifier is for testing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org