jeqo commented on code in PR #15241:
URL: https://github.com/apache/kafka/pull/15241#discussion_r1650469442


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java:
##########
@@ -190,75 +175,145 @@ public void sanityCheck() {
             AbortedTxn abortedTxn = txnWithPosition.txn;
             if (abortedTxn.lastOffset() < startOffset)
                 throw new CorruptIndexException("Last offset of aborted 
transaction " + abortedTxn + " in index "
-                    + file.getAbsolutePath() + " is less than start offset " + 
startOffset);
+                    + txnFile.path().toAbsolutePath() + " is less than start 
offset " + startOffset);
         }
     }
 
-    private FileChannel openChannel() throws IOException {
-        FileChannel channel = FileChannel.open(file.toPath(), 
StandardOpenOption.CREATE,
-                StandardOpenOption.READ, StandardOpenOption.WRITE);
-        maybeChannel = Optional.of(channel);
-        channel.position(channel.size());
-        return channel;
-    }
-
-    private FileChannel channel() throws IOException {
-        FileChannel channel = channelOrNull();
-        if (channel == null)
-            return openChannel();
-        else
-            return channel;
-    }
-
-    private FileChannel channelOrNull() {
-        return maybeChannel.orElse(null);
-    }
-
     private Iterable<AbortedTxnWithPosition> iterable() {
         return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE));
     }
 
     private Iterable<AbortedTxnWithPosition> iterable(Supplier<ByteBuffer> 
allocate) {
-        FileChannel channel = channelOrNull();
-        if (channel == null)
+        if (!txnFile.exists())
             return Collections.emptyList();
 
-        PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0);
+        try {
+            FileChannel channel = txnFile.channel();
+            PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0);
+
+            return () -> new Iterator<AbortedTxnWithPosition>() {
+
+                @Override
+                public boolean hasNext() {
+                    try {
+                        return channel.position() - position.value >= 
AbortedTxn.TOTAL_SIZE;
+                    } catch (IOException e) {
+                        throw new KafkaException("Failed read position from 
the transaction index " + txnFile.path().toAbsolutePath(), e);
+                    }
+                }
+
+                @Override
+                public AbortedTxnWithPosition next() {
+                    try {
+                        ByteBuffer buffer = allocate.get();
+                        Utils.readFully(channel, buffer, position.value);
+                        buffer.flip();
+
+                        AbortedTxn abortedTxn = new AbortedTxn(buffer);
+                        if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION)
+                            throw new KafkaException("Unexpected aborted 
transaction version " + abortedTxn.version()
+                                + " in transaction index " + 
txnFile.path().toAbsolutePath() + ", current version is "
+                                + AbortedTxn.CURRENT_VERSION);
+                        AbortedTxnWithPosition nextEntry = new 
AbortedTxnWithPosition(abortedTxn, position.value);
+                        position.value += AbortedTxn.TOTAL_SIZE;
+                        return nextEntry;
+                    } catch (IOException e) {
+                        // We received an unexpected error reading from the 
index file. We propagate this as an
+                        // UNKNOWN error to the consumer, which will cause it 
to retry the fetch.
+                        throw new KafkaException("Failed to read from the 
transaction index " + txnFile.path().toAbsolutePath(), e);
+                    }
+                }
 
-        return () -> new Iterator<AbortedTxnWithPosition>() {
+            };
+
+        } catch (IOException e) {
+            throw new KafkaException("Failed to read from the transaction 
index " + txnFile.path().toAbsolutePath(), e);
+        }
+    }
 
-            @Override
-            public boolean hasNext() {
-                try {
-                    return channel.position() - position.value >= 
AbortedTxn.TOTAL_SIZE;
-                } catch (IOException e) {
-                    throw new KafkaException("Failed read position from the 
transaction index " + file.getAbsolutePath(), e);
+    public static class TransactionIndexFile {

Review Comment:
   Seems that by making it private, it can only be accessed from 
TransactionIndex class but not tests or others on the same module. 
   Switching access to package-private and adding comment on visibility on last 
commit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to