hachikuji commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1187662801


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -103,28 +121,44 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
             return false;
         }
     }
+    
+    public void maybeUpdateTentaitiveSequence(int sequence) {

Review Comment:
   typo: Tentaitive



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -103,28 +121,44 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
             return false;
         }
     }
+    
+    public void maybeUpdateTentaitiveSequence(int sequence) {
+        if (batchMetadata.isEmpty() && (!this.tentativeSequence.isPresent() || 
this.tentativeSequence.getAsInt() > sequence))
+            this.tentativeSequence = OptionalInt.of(sequence);
+    }
 
     private void addBatchMetadata(BatchMetadata batch) {
+        // When appending a batch, we no longer need tentative sequence.
+        this.tentativeSequence = OptionalInt.empty();
         if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) 
batchMetadata.removeFirst();
         batchMetadata.add(batch);
     }
+    
+    public boolean compareAndSetVerificationState(short expectedProducerEpoch, 
VerificationState expectedVerificationState, VerificationState 
newVerificationState) {
+        if (expectedProducerEpoch == this.producerEpoch && verificationState 
== expectedVerificationState) {
+            this.verificationState = newVerificationState;
+            return true;
+        }
+        return false;
+    }
 
     public void update(ProducerStateEntry nextEntry) {
-        update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch, 
nextEntry.lastTimestamp, nextEntry.batchMetadata, 
nextEntry.currentTxnFirstOffset);
+        update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch, 
nextEntry.lastTimestamp, nextEntry.batchMetadata, 
nextEntry.currentTxnFirstOffset, nextEntry.verificationState);
     }
 
     public void update(short producerEpoch, int coordinatorEpoch, long 
lastTimestamp) {
-        update(producerEpoch, coordinatorEpoch, lastTimestamp, new 
ArrayDeque<>(0), OptionalLong.empty());
+        update(producerEpoch, coordinatorEpoch, lastTimestamp, new 
ArrayDeque<>(0), OptionalLong.empty(), VerificationState.EMPTY);
     }
 
     private void update(short producerEpoch, int coordinatorEpoch, long 
lastTimestamp, Deque<BatchMetadata> batchMetadata,
-                        OptionalLong currentTxnFirstOffset) {
+                        OptionalLong currentTxnFirstOffset, VerificationState 
verificationState) {

Review Comment:
   nit: when arg lists go above 2 or 3, it's helpful to start putting each 
argument on a separate line:
   ```java
   private void update(
     short producerEpoch,
     int coordinatorEpoch,
     long lastTimestamp,
     ...
   ) {
   ````



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
     private int coordinatorEpoch;
     private long lastTimestamp;
     private OptionalLong currentTxnFirstOffset;
+    
+    private VerificationState verificationState;
+    
+    // Before any batches are associated with the entry, the tentative 
sequence represents the lowest sequence seen.
+    private OptionalInt tentativeSequence;
+    
+    public enum VerificationState {
+        EMPTY,
+        VERIFYING,
+        VERIFIED
+    }
 
     public static ProducerStateEntry empty(long producerId) {
-        return new ProducerStateEntry(producerId, 
RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, 
OptionalLong.empty(), Optional.empty());
+        return new ProducerStateEntry(producerId, 
RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, 
OptionalLong.empty(), Optional.empty(), VerificationState.EMPTY, 
OptionalInt.empty());
+    }
+
+    public static ProducerStateEntry forVerification(long producerId, short 
producerEpoch, long milliseconds) {

Review Comment:
   nit: `milliseconds` -> `lastTimestamp`?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -183,6 +184,19 @@ private void clearProducerIds() {
         producers.clear();
         producerIdCount = 0;
     }
+    
+    public ProducerStateEntry entryForVerification(long producerId, short 
producerEpoch, int firstSequence) {
+        ProducerStateEntry entry;
+        if (producers.containsKey(producerId)) {

Review Comment:
   nit: usually we would call `get` and check for null (saves one hash lookup)



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,9 +579,33 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
-  def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
-    val entry = producerStateManager.activeProducers.get(producerId)
-    entry != null && entry.currentTxnFirstOffset.isPresent
+  def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, 
baseSequence: Int): Boolean = lock synchronized {
+    val entry = producerStateManager.entryForVerification(producerId, 
producerEpoch, baseSequence)
+    (!entry.currentTxnFirstOffset.isPresent) &&
+      (entry.compareAndSetVerificationState(producerEpoch, 
ProducerStateEntry.VerificationState.EMPTY, 
ProducerStateEntry.VerificationState.VERIFYING) ||
+        entry.verificationState() == 
ProducerStateEntry.VerificationState.VERIFYING)
+  }
+  
+  def compareAndSetVerificationState(producerId: Long,
+                                     producerEpoch: Short,
+                                     baseSequence: Int,
+                                     expectedVerificationState: 
ProducerStateEntry.VerificationState,
+                                     newVerificationState: 
ProducerStateEntry.VerificationState): Unit = { lock synchronized {

Review Comment:
   nit: can drop braces around `lock synchronized`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to