ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1158577698


##########
clients/src/main/java/org/apache/kafka/common/utils/BytesStreamBufferSource.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides a BytesStream wrapper over a buffer.
+ */
+public class BytesStreamBufferSource implements BytesStream {
+    final private ByteBuffer buf;
+
+    public BytesStreamBufferSource(final ByteBuffer buffer) {
+        // we do not modify the markers of source buffer
+        buf = buffer.duplicate();
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+        return buf.get() & 0xFF;
+    }
+
+    @Override
+    public int skipBytes(int toSkip) {
+        if (toSkip <= 0) {
+            return 0;
+        }
+
+        int avail = Math.min(toSkip, buf.remaining());
+        buf.position(buf.position() + avail);
+        return avail;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buf.remaining());
+        buf.get(b, off, len);
+        return len;
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+        try {
+            return buf.get();

Review Comment:
   Would it be better to call `read` and check for `-1`? That would result in 
one exception (and resulting stacktrace) vs two.



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks 
of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to 
sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call 
crosses JNI boundary.
+ * <p>
+ * The functionality of this stream is a combination of DataInput and 
BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()

Review Comment:
   Incomplete sentence?



##########
clients/src/main/java/org/apache/kafka/common/utils/SkippableChunkedBytesStream.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * SkippableChunkedBytesStream is a variant of ChunkedBytesStream which does 
not push skip() to the sourceStream.
+ * <p>
+ * Unlike BufferedInputStream.skip() and ChunkedBytesStream.skip(), this does 
not push skip() to sourceStream.
+ * We want to avoid pushing this to sourceStream because it's implementation 
maybe inefficient, e.g. the case of Z
+ * stdInputStream which allocates a new buffer from buffer pool, per skip call.
+ *
+ * @see ChunkedBytesStream
+ */
+public class SkippableChunkedBytesStream extends ChunkedBytesStream {

Review Comment:
   Would it be simpler to make this behavior configurable in 
`ChunkedBytesStream`? Also, I thought the zstd issue was fixed?



##########
clients/src/test/java/org/apache/kafka/common/utils/SkippableChunkedBytesStreamTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SkippableChunkedBytesStreamTest {
+    private static final Random RANDOM = new Random(1337);
+    private final BufferSupplier supplier = BufferSupplier.NO_CACHING;
+
+    @ParameterizedTest
+    @MethodSource("provideSourceSkipValuesForTest")
+    public void skip_testCorrectness(int bytesToPreRead, ByteBuffer inputBuf, 
int numBytesToSkip) throws IOException {
+        int expectedInpLeftAfterSkip = inputBuf.remaining() - bytesToPreRead - 
numBytesToSkip;
+        int expectedSkippedBytes = Math.min(inputBuf.remaining() - 
bytesToPreRead, numBytesToSkip);
+
+        try (BytesStream is = new ChunkedBytesStream(new 
ByteBufferInputStream(inputBuf.duplicate()), supplier, 10)) {
+            int cnt = 0;
+            while (cnt++ < bytesToPreRead) {
+                is.readByte();
+            }
+
+            int res = is.skipBytes(numBytesToSkip);
+            assertEquals(expectedSkippedBytes, res);
+
+            // verify that we are able to read rest of the input
+            cnt = 0;
+            while (cnt++ < expectedInpLeftAfterSkip) {
+                is.readByte();
+            }
+        }
+    }
+
+    @Test
+    public void skip_testEndOfSource() throws IOException {

Review Comment:
   We don't typically use this kind of naming convention in our tests (i.e. 
using `_`).



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -64,17 +66,20 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
         }
 
         @Override
-        public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
+        public BytesStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                // Set output buffer (uncompressed) to 16 KB (none by default) 
and input buffer (compressed) to
-                // 8 KB (0.5 KB by default) to ensure reasonable performance 
in cases where the caller reads a small
-                // number of bytes (potentially a single byte)
-                return new BufferedInputStream(new GZIPInputStream(new 
ByteBufferInputStream(buffer), 8 * 1024),
-                        16 * 1024);
+                // Set output buffer (uncompressed) to 16 KB and Set input 
buffer (compressed) to 8 KB (0.5 KB by default) to ensure reasonable 
performance in cases

Review Comment:
   16 KB comes from `getRecommendedDOutSize` now, it's a bit confusing to have 
that part of the comment here.



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
         @Override
         public InputStream wrapForInput(ByteBuffer inputBuffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return new KafkaLZ4BlockInputStream(inputBuffer, 
decompressionBufferSupplier,
-                                                    messageVersion == 
RecordBatch.MAGIC_VALUE_V0);
+                return new ChunkedDataInputStream(
+                    new KafkaLZ4BlockInputStream(inputBuffer, 
decompressionBufferSupplier, messageVersion == RecordBatch.MAGIC_VALUE_V0),
+                    decompressionBufferSupplier, getRecommendedDOutSize());
             } catch (Throwable e) {
                 throw new KafkaException(e);
             }
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 2 * 1024; // 2KB

Review Comment:
   It's worth adding a comment in the method itself - it will be hard to 
remember otherwise.



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -90,8 +95,13 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
         }
 
         @Override
-        public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
-            return SnappyFactory.wrapForInput(buffer);
+        public BytesStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
+            return new 
SkippableChunkedBytesStream(SnappyFactory.wrapForInput(buffer), 
decompressionBufferSupplier, getRecommendedDOutSize());
+        }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 8 * 1024; // 8KB

Review Comment:
   Why is this 8KB while the lz4 one is 2KB?



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks 
of configurable size. The

Review Comment:
   What is a `ByteReader`? Did you mean `BytesStream`?



##########
clients/src/main/java/org/apache/kafka/common/utils/BytesStreamBufferSource.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides a BytesStream wrapper over a buffer.
+ */
+public class BytesStreamBufferSource implements BytesStream {

Review Comment:
   Maybe call this `ByteBufferBytesStream`?



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks 
of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to 
sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call 
crosses JNI boundary.
+ * <p>
+ * The functionality of this stream is a combination of DataInput and 
BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()
+ * - Unlike BufferedInputStream, which allocates an intermediate buffer, this 
uses a buffer supplier to create the
+ * intermediate buffer
+ * - Unlike DataInputStream, the readByte method does not push the reading of 
a byte to sourceStream.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where 
multiple threads access this.
+ * - many method are un-supported in this class because they aren't currently 
used in the caller code.
+ * - the implementation of this class is performance sensitive. Minor changes 
as usage of ByteBuffer instead of byte[]
+ *   can significantly impact performance, hence, proceed with caution.
+ */
+public class ChunkedBytesStream implements BytesStream {
+    /**
+     * Supplies the ByteBuffer which is used as intermediate buffer to store 
the chunk of output data.
+     */
+    private final BufferSupplier bufferSupplier;
+    /**
+     * Source stream containing compressed data.
+     */
+    private InputStream sourceStream;
+    /**
+     * Intermediate buffer to store the chunk of output data. The 
ChunkedBytesStream is considered closed if
+     * this buffer is null.
+     */
+    private byte[] intermediateBuf;
+    protected int limit;
+    /**
+     *
+     */
+    protected int pos;
+    /**
+     * Reference for the intermediate buffer. This reference is only kept for 
releasing the buffer from the '
+     * buffer supplier.
+     */
+    private ByteBuffer intermediateBufRef;
+
+
+    public ChunkedBytesStream(InputStream sourceStream, BufferSupplier 
bufferSupplier, int intermediateBufSize) {
+        this.bufferSupplier = bufferSupplier;
+        this.sourceStream = sourceStream;
+        intermediateBufRef = bufferSupplier.get(intermediateBufSize);
+        if (!intermediateBufRef.hasArray() || 
(intermediateBufRef.arrayOffset() != 0)) {
+            throw new IllegalArgumentException("provided ByteBuffer lacks 
array or has non-zero arrayOffset");
+        }
+        intermediateBuf = intermediateBufRef.array();
+    }
+
+    private byte[] getBufIfOpen() throws IOException {
+        byte[] buffer = intermediateBuf;
+        if (buffer == null)
+            throw new IOException("Stream closed");
+        return buffer;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (pos >= limit) {
+            fill();
+            if (pos >= limit)
+                return -1;
+        }
+
+        return getBufIfOpen()[pos++] & 0xff;
+    }
+
+    InputStream getInIfOpen() throws IOException {
+        InputStream input = sourceStream;
+        if (input == null)
+            throw new IOException("Stream closed");
+        return input;
+    }
+
+    /**
+     * Fills the intermediate buffer with more data. The amount of new data 
read is equal to the remaining empty space
+     * in the buffer. For optimal performance, read as much data as possible 
in this call.
+     */
+    int fill() throws IOException {
+        byte[] buffer = getBufIfOpen();
+
+        // switch to writing mode
+        pos = 0;
+        limit = pos;
+        int bytesRead = getInIfOpen().read(buffer, pos, buffer.length - pos);
+
+        if (bytesRead > 0)
+            limit = bytesRead + pos;
+
+        return bytesRead;
+    }
+
+    @Override
+    public void close() throws IOException {
+        byte[] mybuf = intermediateBuf;
+        intermediateBuf = null;
+
+        InputStream input = sourceStream;
+        sourceStream = null;
+
+        if (mybuf != null)
+            bufferSupplier.release(intermediateBufRef);
+        if (input != null)
+            input.close();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int totalRead = 0;
+        int bytesRead = 0;
+        while (totalRead < len) {
+            bytesRead = 0;
+            int toRead = len - totalRead;
+            if (pos >= limit) {
+                if (toRead >= getBufIfOpen().length) {
+                    // don't use intermediate buffer if we need to read more 
than it's capacity
+                    bytesRead = getInIfOpen().read(b, off + totalRead, toRead);
+                    if (bytesRead < 0)
+                        break;
+                } else {
+                    fill();
+                    if (pos >= limit)
+                        break;
+                }
+            } else {
+                int avail = limit - pos;
+                toRead = (avail < toRead) ? avail : toRead;
+                System.arraycopy(getBufIfOpen(), pos, b, off + totalRead, 
toRead);
+                pos += toRead;
+                bytesRead = toRead;
+            }
+
+            totalRead += bytesRead;
+        }
+
+        if ((bytesRead <= 0) && (totalRead < len))
+            return -1;
+
+        return totalRead;
+    }
+
+    @Override
+    public int skipBytes(int toSkip) throws IOException {
+        if (toSkip <= 0) {
+            return 0;
+        }
+        int totalSkipped = 0;
+
+        // Skip what exists in the intermediate buffer first
+        int avail = limit - pos;
+        int bytesToRead = (avail < (toSkip - totalSkipped)) ? avail : (toSkip 
- totalSkipped);
+        pos += bytesToRead;
+        totalSkipped += bytesToRead;
+
+        // Use sourceStream's skip() to skip the rest
+        while ((totalSkipped < toSkip) && ((bytesToRead = (int) 
getInIfOpen().skip(toSkip - totalSkipped)) > 0)) {
+            totalSkipped += bytesToRead;
+        }
+
+        return totalSkipped;
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+        if (pos >= limit) {
+            fill();
+            if (pos >= limit)
+                throw new EOFException();
+        }
+        return getBufIfOpen()[pos++];
+    }
+
+    // visible for testing
+    public InputStream getSourceStream() {

Review Comment:
   Nit: we typically avoid the `get` prefix in Kafka.



##########
clients/src/main/java/org/apache/kafka/common/utils/BytesStream.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This interface provides for reading bytes from an underlying source. The 
source could be a buffer or a stream.
+ * It extends the {@link Closeable} interface to ensure that the source is 
appropriately closed (if required).
+ */
+public interface BytesStream extends Closeable {
+    /**
+     * The interface is based on {@link InputStream#read()} and follows the 
same contract.

Review Comment:
   Can we have the description inline with a see also vs not having anything 
useful here? Same for the other methods.



##########
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks 
of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to 
sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call 
crosses JNI boundary.
+ * <p>
+ * The functionality of this stream is a combination of DataInput and 
BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()
+ * - Unlike BufferedInputStream, which allocates an intermediate buffer, this 
uses a buffer supplier to create the
+ * intermediate buffer
+ * - Unlike DataInputStream, the readByte method does not push the reading of 
a byte to sourceStream.
+ * <p>
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where 
multiple threads access this.
+ * - many method are un-supported in this class because they aren't currently 
used in the caller code.

Review Comment:
   Is this still true?



##########
raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java:
##########
@@ -215,14 +215,14 @@ private Batch<T> readBatch(DefaultRecordBatch batch) {
             }
 
             List<T> records = new ArrayList<>(numRecords);
-            DataInputStream input = new 
DataInputStream(batch.recordInputStream(bufferSupplier));
+            BytesStream input = batch.recordInputStream(bufferSupplier);
             try {
                 for (int i = 0; i < numRecords; i++) {
                     T record = readRecord(input, batch.sizeInBytes());
                     records.add(record);
                 }
             } finally {
-                Utils.closeQuietly(input, "DataInputStream");
+                Utils.closeQuietly(input, "RecordBatchBytesStream");

Review Comment:
   It's a bit odd to include this string when the type above is `BytesStream`.



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -157,7 +186,14 @@ public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSu
      *                                    batch. As such, a supplier that 
reuses buffers will have a significant
      *                                    performance impact.
      */
-    public abstract InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier);
+    public abstract BytesStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier);
+
+    /**
+     * Recommended size of buffer for storing decompressed output.
+     */
+    public int getRecommendedDOutSize() {

Review Comment:
   We don't typically use `get` as a prefix for method. Also, this name is a 
bit obscure, could it be `decompressedOutputSize` or something like that?



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
         public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
             return ZstdFactory.wrapForInput(buffer, messageVersion, 
decompressionBufferSupplier);
         }
+
+        @Override
+        public int getRecommendedDOutSize() {
+            return 16 * 1024; // 16KB

Review Comment:
   Also, can you please remind me if the 16 KB number is different from what 
was there before for this case?



##########
clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java:
##########
@@ -29,6 +30,11 @@ public ByteBufferInputStream(ByteBuffer buffer) {
         this.buffer = buffer;
     }
 
+    @Override
+    public int available() throws IOException {

Review Comment:
   Did any tests fail as a result of this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to