ijuma commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1084344515
########## clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java: ########## @@ -273,20 +272,32 @@ public int partitionLeaderEpoch() { public DataInputStream recordInputStream(BufferSupplier bufferSupplier) { final ByteBuffer buffer = this.buffer.duplicate(); buffer.position(RECORDS_OFFSET); - return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier)); + final InputStream decompressedStream = compressionType().wrapForInput(buffer, magic(), bufferSupplier); + return decompressedStream instanceof DataInputStream ? (DataInputStream) decompressedStream : new DataInputStream(decompressedStream); } private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) { final DataInputStream inputStream = recordInputStream(bufferSupplier); if (skipKeyValue) { // this buffer is used to skip length delimited fields like key, value, headers - byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE]; + final ByteBuffer skipBuffer = bufferSupplier.get(compressionType().getRecommendedDOutSize()); Review Comment: Since we cache buffers per thread, I think you mean we will use two buffers instead of one per thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org