jeqo commented on code in PR #15241: URL: https://github.com/apache/kafka/pull/15241#discussion_r1650469442
########## storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java: ########## @@ -190,75 +175,145 @@ public void sanityCheck() { AbortedTxn abortedTxn = txnWithPosition.txn; if (abortedTxn.lastOffset() < startOffset) throw new CorruptIndexException("Last offset of aborted transaction " + abortedTxn + " in index " - + file.getAbsolutePath() + " is less than start offset " + startOffset); + + txnFile.path().toAbsolutePath() + " is less than start offset " + startOffset); } } - private FileChannel openChannel() throws IOException { - FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, - StandardOpenOption.READ, StandardOpenOption.WRITE); - maybeChannel = Optional.of(channel); - channel.position(channel.size()); - return channel; - } - - private FileChannel channel() throws IOException { - FileChannel channel = channelOrNull(); - if (channel == null) - return openChannel(); - else - return channel; - } - - private FileChannel channelOrNull() { - return maybeChannel.orElse(null); - } - private Iterable<AbortedTxnWithPosition> iterable() { return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE)); } private Iterable<AbortedTxnWithPosition> iterable(Supplier<ByteBuffer> allocate) { - FileChannel channel = channelOrNull(); - if (channel == null) + if (!txnFile.exists()) return Collections.emptyList(); - PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0); + try { + FileChannel channel = txnFile.channel(); + PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0); + + return () -> new Iterator<AbortedTxnWithPosition>() { + + @Override + public boolean hasNext() { + try { + return channel.position() - position.value >= AbortedTxn.TOTAL_SIZE; + } catch (IOException e) { + throw new KafkaException("Failed read position from the transaction index " + txnFile.path().toAbsolutePath(), e); + } + } + + @Override + public AbortedTxnWithPosition next() { + try { + ByteBuffer buffer = allocate.get(); + Utils.readFully(channel, buffer, position.value); + buffer.flip(); + + AbortedTxn abortedTxn = new AbortedTxn(buffer); + if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION) + throw new KafkaException("Unexpected aborted transaction version " + abortedTxn.version() + + " in transaction index " + txnFile.path().toAbsolutePath() + ", current version is " + + AbortedTxn.CURRENT_VERSION); + AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value); + position.value += AbortedTxn.TOTAL_SIZE; + return nextEntry; + } catch (IOException e) { + // We received an unexpected error reading from the index file. We propagate this as an + // UNKNOWN error to the consumer, which will cause it to retry the fetch. + throw new KafkaException("Failed to read from the transaction index " + txnFile.path().toAbsolutePath(), e); + } + } - return () -> new Iterator<AbortedTxnWithPosition>() { + }; + + } catch (IOException e) { + throw new KafkaException("Failed to read from the transaction index " + txnFile.path().toAbsolutePath(), e); + } + } - @Override - public boolean hasNext() { - try { - return channel.position() - position.value >= AbortedTxn.TOTAL_SIZE; - } catch (IOException e) { - throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e); + public static class TransactionIndexFile { Review Comment: Seems that by making it private, it can only be accessed from TransactionIndex class but not tests or others on the same module. Switching access to package-private and adding comment on visibility on last commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org