kamalcph commented on code in PR #15241:
URL: https://github.com/apache/kafka/pull/15241#discussion_r1835574783


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java:
##########
@@ -238,35 +207,137 @@ private Iterable<AbortedTxnWithPosition> 
iterable(Supplier<ByteBuffer> allocate)
             @Override
             public boolean hasNext() {
                 try {
-                    return channel.position() - position.value >= 
AbortedTxn.TOTAL_SIZE;
+                    return txnFile.currentPosition() - position.value >= 
AbortedTxn.TOTAL_SIZE;
                 } catch (IOException e) {
-                    throw new KafkaException("Failed read position from the 
transaction index " + file.getAbsolutePath(), e);
+                    throw new KafkaException("Failed read position from the 
transaction index " + txnFile.path().toAbsolutePath(), e);
                 }
             }
 
             @Override
             public AbortedTxnWithPosition next() {
                 try {
                     ByteBuffer buffer = allocate.get();
-                    Utils.readFully(channel, buffer, position.value);
+                    txnFile.read(buffer, position.value);
                     buffer.flip();
 
                     AbortedTxn abortedTxn = new AbortedTxn(buffer);
                     if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION)
                         throw new KafkaException("Unexpected aborted 
transaction version " + abortedTxn.version()
-                            + " in transaction index " + 
file.getAbsolutePath() + ", current version is "
+                            + " in transaction index " + 
txnFile.path().toAbsolutePath() + ", current version is "
                             + AbortedTxn.CURRENT_VERSION);
                     AbortedTxnWithPosition nextEntry = new 
AbortedTxnWithPosition(abortedTxn, position.value);
                     position.value += AbortedTxn.TOTAL_SIZE;
                     return nextEntry;
                 } catch (IOException e) {
                     // We received an unexpected error reading from the index 
file. We propagate this as an
                     // UNKNOWN error to the consumer, which will cause it to 
retry the fetch.
-                    throw new KafkaException("Failed to read from the 
transaction index " + file.getAbsolutePath(), e);
+                    throw new KafkaException("Failed to read from the 
transaction index " + txnFile.path().toAbsolutePath(), e);
                 }
             }
 
         };
     }
 
+    // Visible for testing
+    static class TransactionIndexFile {
+        // note that the file is not created until we need it
+        private volatile Path path;
+        // channel is reopened as long as there are reads and writes
+        private FileChannel channel;
+
+        TransactionIndexFile(Path path) throws IOException {
+            this.path = path;
+
+            if (Files.exists(path))
+                openChannel();
+        }
+
+        private void openChannel() throws IOException {
+            channel = FileChannel.open(
+                path,
+                StandardOpenOption.CREATE,
+                StandardOpenOption.READ,
+                StandardOpenOption.WRITE
+            );
+            channel.position(channel.size());
+        }
+
+        synchronized void updateParentDir(Path parentDir) {
+            this.path = parentDir.resolve(path.getFileName());
+        }
+
+        synchronized void renameTo(Path other) throws IOException {
+            try {
+                if (Files.exists(path))
+                    Utils.atomicMoveWithFallback(path, other, false);
+            } finally {
+                this.path = other;
+            }
+        }
+
+        synchronized void flush() throws IOException {
+            if (channel != null)
+                channel.force(true);
+        }
+
+        synchronized void closeChannel() throws IOException {
+            if (channel != null)
+                channel.close();
+        }
+        
+        synchronized boolean isChannelOpen() {
+            return channel != null && channel.isOpen();
+        }
+
+        Path path() {
+            return path;
+        }
+
+        synchronized void truncate(long position) throws IOException {
+            if (channel != null)
+                channel.truncate(position);
+        }
+
+        boolean exists() {
+            return Files.exists(path);
+        }
+
+        boolean deleteIfExists() throws IOException {
+            closeChannel();
+            return Files.deleteIfExists(path());
+        }
+
+        void write(ByteBuffer buffer) throws IOException {
+            Utils.writeFully(channel(), buffer);
+        }
+
+        void read(ByteBuffer buffer, int position) throws IOException {

Review Comment:
   can we rename this method to `readFully` as it doesn't read byte-by-byte?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to