artemlivshits commented on code in PR #19470: URL: https://github.com/apache/kafka/pull/19470#discussion_r2057587163
########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1595,4 +1595,105 @@ else if (recordPartition != null) return topicPartition; } } + + /** + * A class that represents the state of a prepared transaction that can be + * serialized to and deserialized from a string. + * Objects of this class can be written to and read from a database + * to support recovery of prepared transactions. + */ + public static class PreparedTxnState { + private final long producerId; + private final short epoch; + + /** + * Creates a new empty PreparedTxnState + */ + public PreparedTxnState() { + this.producerId = -1L; + this.epoch = -1; + } + + /** + * Creates a new PreparedTxnState from a serialized string representation + * + * @param serializedState The serialized string to deserialize. + * @throws IllegalArgumentException if the serialized string is not in the expected format + */ + public PreparedTxnState(String serializedState) { + if (serializedState == null || serializedState.isEmpty()) { + this.producerId = -1L; + this.epoch = -1; + return; + } + + try { + String[] parts = serializedState.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid serialized transaction state format: " + serializedState); + } + + this.producerId = Long.parseLong(parts[0]); + this.epoch = Short.parseShort(parts[1]); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid serialized transaction state format: " + serializedState, e); + } + } + + /** + * Creates a new PreparedTxnState with the given producer ID and epoch + * + * @param producerId The producer ID + * @param epoch The producer epoch + */ + public PreparedTxnState(long producerId, short epoch) { + this.producerId = producerId; + this.epoch = epoch; + } + + public long producerId() { + return producerId; + } + + public short epoch() { + return epoch; + } + + /** + * Checks if this state contains valid producer ID and epoch values. + * A state is considered valid if the producer ID is not -1. + * + * @return true if the state is valid, false otherwise + */ + public boolean isValid() { Review Comment: Should probably change the name to `isSet` or `hasTransaction` or `isEmpty` (the last is obviously inverted) -- the value is valid, just doesn't specify any specify transaction. ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1595,4 +1595,105 @@ else if (recordPartition != null) return topicPartition; } } + + /** + * A class that represents the state of a prepared transaction that can be + * serialized to and deserialized from a string. + * Objects of this class can be written to and read from a database + * to support recovery of prepared transactions. + */ + public static class PreparedTxnState { + private final long producerId; + private final short epoch; + + /** + * Creates a new empty PreparedTxnState + */ + public PreparedTxnState() { + this.producerId = -1L; + this.epoch = -1; + } + + /** + * Creates a new PreparedTxnState from a serialized string representation + * + * @param serializedState The serialized string to deserialize. + * @throws IllegalArgumentException if the serialized string is not in the expected format + */ + public PreparedTxnState(String serializedState) { + if (serializedState == null || serializedState.isEmpty()) { + this.producerId = -1L; + this.epoch = -1; Review Comment: We can use `RecordBatch.NO_PRODUCER_ID` and `RecordBatch.NO_PRODUCER_EPOCH` here. ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1595,4 +1595,105 @@ else if (recordPartition != null) return topicPartition; } } + + /** + * A class that represents the state of a prepared transaction that can be + * serialized to and deserialized from a string. + * Objects of this class can be written to and read from a database + * to support recovery of prepared transactions. + */ + public static class PreparedTxnState { + private final long producerId; + private final short epoch; + + /** + * Creates a new empty PreparedTxnState + */ + public PreparedTxnState() { + this.producerId = -1L; + this.epoch = -1; + } + + /** + * Creates a new PreparedTxnState from a serialized string representation + * + * @param serializedState The serialized string to deserialize. + * @throws IllegalArgumentException if the serialized string is not in the expected format + */ + public PreparedTxnState(String serializedState) { + if (serializedState == null || serializedState.isEmpty()) { + this.producerId = -1L; + this.epoch = -1; + return; + } + + try { + String[] parts = serializedState.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid serialized transaction state format: " + serializedState); + } + + this.producerId = Long.parseLong(parts[0]); + this.epoch = Short.parseShort(parts[1]); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid serialized transaction state format: " + serializedState, e); + } + } + + /** + * Creates a new PreparedTxnState with the given producer ID and epoch + * + * @param producerId The producer ID + * @param epoch The producer epoch + */ + public PreparedTxnState(long producerId, short epoch) { + this.producerId = producerId; + this.epoch = epoch; + } + + public long producerId() { + return producerId; + } + + public short epoch() { + return epoch; + } + + /** + * Checks if this state contains valid producer ID and epoch values. + * A state is considered valid if the producer ID is not -1. + * + * @return true if the state is valid, false otherwise + */ + public boolean isValid() { + return producerId != -1L; + } + + /** + * Returns a serialized string representation of this transaction state + * The format is "producerId:epoch" + * + * @return a serialized string representation + */ + @Override + public String toString() { + return producerId + ":" + epoch; Review Comment: Should we just return an empty string if producerId / epoch is not set? This would make it so that empty state is represented by one string value (empty) rather than two (empty and "-1:-1"). ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1595,4 +1595,105 @@ else if (recordPartition != null) return topicPartition; } } + + /** + * A class that represents the state of a prepared transaction that can be + * serialized to and deserialized from a string. + * Objects of this class can be written to and read from a database + * to support recovery of prepared transactions. + */ + public static class PreparedTxnState { + private final long producerId; + private final short epoch; + + /** + * Creates a new empty PreparedTxnState + */ + public PreparedTxnState() { + this.producerId = -1L; + this.epoch = -1; + } + + /** + * Creates a new PreparedTxnState from a serialized string representation + * + * @param serializedState The serialized string to deserialize. + * @throws IllegalArgumentException if the serialized string is not in the expected format + */ + public PreparedTxnState(String serializedState) { + if (serializedState == null || serializedState.isEmpty()) { + this.producerId = -1L; + this.epoch = -1; + return; + } + + try { + String[] parts = serializedState.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid serialized transaction state format: " + serializedState); + } + + this.producerId = Long.parseLong(parts[0]); + this.epoch = Short.parseShort(parts[1]); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid serialized transaction state format: " + serializedState, e); + } + } + + /** + * Creates a new PreparedTxnState with the given producer ID and epoch + * + * @param producerId The producer ID + * @param epoch The producer epoch + */ + public PreparedTxnState(long producerId, short epoch) { Review Comment: This shouldn't be public. An application should be able to either create `PreparedTxnState()` or `PreparedTxnState(string)`. ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1595,4 +1595,105 @@ else if (recordPartition != null) return topicPartition; } } + + /** + * A class that represents the state of a prepared transaction that can be + * serialized to and deserialized from a string. + * Objects of this class can be written to and read from a database + * to support recovery of prepared transactions. + */ + public static class PreparedTxnState { + private final long producerId; + private final short epoch; + + /** + * Creates a new empty PreparedTxnState + */ + public PreparedTxnState() { + this.producerId = -1L; + this.epoch = -1; Review Comment: We can use `RecordBatch.NO_PRODUCER_ID` and `RecordBatch.NO_PRODUCER_EPOCH` here. ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1595,4 +1595,105 @@ else if (recordPartition != null) return topicPartition; } } + + /** + * A class that represents the state of a prepared transaction that can be + * serialized to and deserialized from a string. + * Objects of this class can be written to and read from a database + * to support recovery of prepared transactions. + */ + public static class PreparedTxnState { + private final long producerId; + private final short epoch; + + /** + * Creates a new empty PreparedTxnState + */ + public PreparedTxnState() { + this.producerId = -1L; + this.epoch = -1; + } + + /** + * Creates a new PreparedTxnState from a serialized string representation + * + * @param serializedState The serialized string to deserialize. + * @throws IllegalArgumentException if the serialized string is not in the expected format + */ + public PreparedTxnState(String serializedState) { + if (serializedState == null || serializedState.isEmpty()) { + this.producerId = -1L; + this.epoch = -1; + return; + } + + try { + String[] parts = serializedState.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid serialized transaction state format: " + serializedState); + } + + this.producerId = Long.parseLong(parts[0]); + this.epoch = Short.parseShort(parts[1]); Review Comment: Should also validate that these are either >= 0 or -1:-1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org