[ https://issues.apache.org/jira/browse/KAFKA-15057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Divij Vaidya reassigned KAFKA-15057: ------------------------------------ Assignee: (was: Divij Vaidya) > Use new interface ZstdBufferDecompressingStreamNoFinalizer from zstd-jni > ------------------------------------------------------------------------ > > Key: KAFKA-15057 > URL: https://issues.apache.org/jira/browse/KAFKA-15057 > Project: Kafka > Issue Type: Sub-task > Components: core > Affects Versions: 3.6.0 > Reporter: Divij Vaidya > Priority: Major > Labels: performance > Attachments: zstd-upgrade.xlsx > > > h1. Background > In Kafka's code, every batch of records is stored in a in-memory byte buffer. > For compressed workload, this buffer contains data in compressed form. Before > writing it to the log, Kafka performs some validations such as ensuring that > offsets are monotonically increasing etc. To perform this validation, Kafka > needs to uncompress the data stored in byte buffer. > For zstd compressed batches, Kafka uses ZstdInputStreamNoFinalizer interface > provided by the downstream zstd-jni library to perform decompression. > ZstdInputStreamNoFinalizer takes input an InputStream and provides output an > InputStream. Since, Kafka stores the entire batch in a ByteBuffer, Kafka > wraps the ByteBuffer into an InputStream to satisfy the input contract for > ZstdInputStreamNoFinalizer. > h1. Problem > ZstdInputStreamNoFinalizer is not a good fit for our use case because we > already have the entire compressed data stored in a buffer. We don't have a > need for an interface which takes InputStream as an input. Our requirement is > for an interface which takes a ByteBuffer as an input and provides a stream > of uncompressed data as output. Prior to zstd-jni 1.5.5, no such interface > existed. Hence, we were forced to use ZstdInputStreamNoFinalizer. > Usage of ZstdInputStreamNoFinalizer has the following problems: > 1. When decompression of batch is complete, we try to read another byte to > check if the actual batch size if equal to declared batch size. This is done > at RecordIterator#next(). This extra call to read another byte leads to a JNI > call in existing interface. > 2. Since this interface requires input as a InputStream, we take the > ByteBuffer containing compressed batch and convert it into a InputStream. > This interface internally uses an intermediate buffer to read data from this > InputStream in chunks. The chunk size is determined by underlying zstd > library and hence, we will allocate a new buffer with very batch. This leads > to the following transformation: ByteBuffer (compressed batch) -> InputStream > (compressed batch) -> data copy to intermediate ByteBuffer (chunk of > compressed batch) -> send chunk to zstd library for decompression -> refill > the intermediate buffer by copying the data to intermediate ByteBuffer (next > chunk of compressed batch) > h1. Solution > I have extended an an interface in downstream library zstd-jni to suit the > use case of Kafka. The new interface is called > ZstdBufferDecompressingStreamNoFinalizer. It provides an interface where it > takes input as a ByteBuffer containing compressed data and provides output as > an InputStream. It solves the above problems as follows: > 1. When we read the final decompressed frame, this interface sets a flag to > mark that all uncompressed data has been consumed. When RecordIterator#next() > tries to determine if the stream has ended, we simply read the flag and > hence, do not have to make a JNI call. > 2. It does not require any buffer allocation for input. It takes the input > buffer and passes it across the JNI boundary without any intermediate > copying. Hence, we don't perform any buffer allocation. > h1. References > h2. Changes in downstream zstd-jni > Add new interface - > [https://github.com/luben/zstd-jni/commit/d65490e8b8aadc4b59545755e55f7dd368fe8aa5] > Bug fixes in new interface - > [https://github.com/luben/zstd-jni/commit/8bf8066438785ce55b62fc7e6816faafe1e3b39e] > > [https://github.com/luben/zstd-jni/commit/100c434dfcec17a865ca2c2b844afe1046ce1b10] > [https://github.com/luben/zstd-jni/commit/355b8511a2967d097a619047a579930cac2ccd9d] > -- This message was sent by Atlassian Jira (v8.20.10#820010)