Divij Vaidya created KAFKA-14629:
------------------------------------

             Summary: Performance improvement for Zstd compressed workload
                 Key: KAFKA-14629
                 URL: https://issues.apache.org/jira/browse/KAFKA-14629
             Project: Kafka
          Issue Type: Improvement
          Components: core
            Reporter: Divij Vaidya
            Assignee: Divij Vaidya
         Attachments: benchmark-jira.xlsx

h2. Motivation

>From a CPU flamegraph analysis for a compressed workload (openmessaging), we 
>have observed that ValidateMessagesAndAssignOffsets method takes 75% of the 
>total CPU/time taken by UnifiedLog.appendAsLeader(). The improvements 
>suggested below will reduce CPU usage and increase throughput.
h2. Background

A producer will append multiple records in a batch and the batch will be 
compressed. This compressed batch along with some headers will be sent to the 
server. On the server, it will perform a checksum for the batch to validate 
data integrity during network transfer. The batch payload is still in 
compressed form so far. Broker will now try to append this batch to the log. 
Before appending, broker will perform schema integrity validation on individual 
records such as record offsets are monotonically increasing etc. To perform 
these validations, server will have to decompress the batch.

The schema validation of a batch on the server is done by decompressing and 
validating individual records. For each records, the validation needs to read 
all fields from the record except for key and value. [1]
h2. Performance requirements

Pre-allocation of array should not add excessive overhead to batches with small 
records → For example allocating a 65KB array for a record of size 1KB is an 
overkill and negatively impacts performance for small size requests.

Overhead of skipping bytes should be minimal → we don’t need to read key/value 
of a record which on average is the largest amount of data in a record. The 
implementation should efficiently skip key/value bytes 

Minimize the number of JNI calls → JNI calls are expensive and work best when 
you make fewer calls to decompress/compress the same amount of data.

Minimize new byte array/buffer allocation → Ideally, the only array allocation 
that should happen would be the array used to store the result of 
decompression. Even this could be optimized by using buffers backed direct 
memory or re-using same buffers since we process one record at a time.
h2. Current implementation - decompression + zstd

We allocated a 2KB array called skipArray to store decompressed data [2]. This 
array is re-used for the scope of a batch (i.e. across all records). 

We allocate a 16KB array to buffer the data between skipArray and underlying 
zstd-jni library calls [3]. The motivation of doing is to read at least 16KB of 
data at-a-time in one single call to the JNI layer. This array is re-used for 
the scope of a batch (i.e. across all records).

We provide a BufferPool to zstd-jni. It uses this pool to create buffers for 
it’s own use, i.e. one allocation per batch and one allocation per skip call(). 
Note that this pool is not used to store the output of decompression. 
Currently, we use BufferPool which is scoped to a thread. 


h2. Potential improvements
 # Do not read the end of the batch since it contains the key/value for last 
record. Instead of “skipping” which would lead to decompression, we can simply 
not read it at all.
 # Remove two layers of buffers (the 16KB one and 2KB one) and replace with a 
single buffer called decompressionBuffer. The time it takes to prepare a batch 
for decompression will be bounded by the allocation of largest buffer and 
hence, using only one large buffer (16KB) doesn’t cause any regression.
 # Use BufferSupplier to allocate the intermediate decompressed buffer.
 # Calculate the size of decompressed buffer dynamically at runtime. It could 
be based on recommendation provided by Zstd. Currently fixed at 16KB. Using the 
value that is recommended by Zstd saves a copy in native code. 
[https://github.com/facebook/zstd/issues/340]
 # Provide a pool of direct buffers to zstd-jni for it’s internal usage. Direct 
buffers is an ideal use case for scenarios where data is transferred across JNI 
such as the case in (de) compression. The latest version of zstd-jni works with 
direct buffers.
 # Read the network input into a direct buffer and pass that to zstd-jni for 
decompression. Store the output in a direct buffer as well.
 # Use dictionary functionality of decompression. Train the dictionary for 
first few MBs and then use it.
 # Use the skip functionality of zstd-jni and do not bring “skipped” data to 
Kafka layer, hence, we don’t need a buffer size to store skipped data in Kafka. 
This could be done by using DataInputStream and removing the intermediate 
buffer stream (16Kb one).

h2. Prototype implementation

[https://github.com/divijvaidya/kafka/commits/optimize-compression] 
h2. JMH benchmark of prototype

After implementation of suggestion#2 and suggestion#3, we observe 10-25% 
improvement in throughput over existing implementation for large message size 
and 0-2% improvement in throughout for small message sizes. Note that we expect 
this performance to be further improved in production because the thread scope 
cached memory pool will be re-used to a greater extent over there. For detailed 
results see attached benchmark.
h2. 
Reference

[1] LogValidator.java 
[2] DefaultRecordBatch → skipArray
[3] ZStdFactory



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to