junrao commented on code in PR #15241: URL: https://github.com/apache/kafka/pull/15241#discussion_r1985806237
########## storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java: ########## @@ -238,35 +207,137 @@ private Iterable<AbortedTxnWithPosition> iterable(Supplier<ByteBuffer> allocate) @Override public boolean hasNext() { try { - return channel.position() - position.value >= AbortedTxn.TOTAL_SIZE; + return txnFile.currentPosition() - position.value >= AbortedTxn.TOTAL_SIZE; } catch (IOException e) { - throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e); + throw new KafkaException("Failed read position from the transaction index " + txnFile.path().toAbsolutePath(), e); } } @Override public AbortedTxnWithPosition next() { try { ByteBuffer buffer = allocate.get(); - Utils.readFully(channel, buffer, position.value); + txnFile.readFully(buffer, position.value); buffer.flip(); AbortedTxn abortedTxn = new AbortedTxn(buffer); if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION) throw new KafkaException("Unexpected aborted transaction version " + abortedTxn.version() - + " in transaction index " + file.getAbsolutePath() + ", current version is " + + " in transaction index " + txnFile.path().toAbsolutePath() + ", current version is " + AbortedTxn.CURRENT_VERSION); AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value); position.value += AbortedTxn.TOTAL_SIZE; return nextEntry; } catch (IOException e) { // We received an unexpected error reading from the index file. We propagate this as an // UNKNOWN error to the consumer, which will cause it to retry the fetch. - throw new KafkaException("Failed to read from the transaction index " + file.getAbsolutePath(), e); + throw new KafkaException("Failed to read from the transaction index " + txnFile.path().toAbsolutePath(), e); } } }; } + // Visible for testing + static class TransactionIndexFile { + // note that the file is not created until we need it + private volatile Path path; + // channel is reopened as long as there are reads and writes + private FileChannel channel; + + TransactionIndexFile(Path path) throws IOException { + this.path = path; + + if (Files.exists(path)) + openChannel(); + } + + private void openChannel() throws IOException { + channel = FileChannel.open( + path, + StandardOpenOption.CREATE, + StandardOpenOption.READ, + StandardOpenOption.WRITE + ); + channel.position(channel.size()); + } + + synchronized void updateParentDir(Path parentDir) { Review Comment: The solution is not ideal. When the channel is closed, it's possible that a LocalLog.read() is also reading from the channel and it will hit `ClosedByInterruptException`. Since this is an IOException, the `maybeHandleIOException()` logic will treat it as a failed disk and kill the broker (if there is only one disk). I think it's better to change the logic in RemoteLogReader such that it never interrupts the thread. Note that we made other threads like ReplicaFetcherThread and CleanerThread non-interruptible for the same reason. So, this approach will be consistent with the existing ones. Specially, we could change the following code to cancel without interruption. ` val cancelled = remoteFetchTask.cancel(true)` We can then pass in a cancelled flag in RemoteLogReader. The flag will be set when the task is cancelled and `RemoteLogManager.read()` can check this flag to abort the operation early if needed. This way, we can avoid the risk of false disk failure and the overhead of reopening the file channel for txn index. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org