[
https://issues.apache.org/jira/browse/KAFKA-6430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismael Juma resolved KAFKA-6430.
--------------------------------
Resolution: Fixed
Fix Version/s: 1.1.0
> Improve Kafka GZip compression performance
> ------------------------------------------
>
> Key: KAFKA-6430
> URL: https://issues.apache.org/jira/browse/KAFKA-6430
> Project: Kafka
> Issue Type: Improvement
> Components: clients, compression, core
> Reporter: Ying Zheng
> Assignee: Ying Zheng
> Priority: Minor
> Fix For: 1.1.0
>
>
> To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
> new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
> To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
> new DataInputStream(new GZIPInputStream(buffer));
> This is very straight forward, but actually inefficient. For each message, in
> addition to the key and value data, Kafka has to write about 30 some metadata
> bytes (slightly varies in different Kafka version), including magic byte,
> checksum, timestamp, offset, key length, value length etc. For each of these
> bytes, java DataOutputStream has to call write(byte) once. Here is the
> awkward writeInt() method in DataOutputStream, which writes 4 bytes
> separately in big-endian order.
> {code}
> public final void writeInt(int v) throws IOException {
> out.write((v >>> 24) & 0xFF);
> out.write((v >>> 16) & 0xFF);
> out.write((v >>> 8) & 0xFF);
> out.write((v >>> 0) & 0xFF);
> incCount(4);
> }
> {code}
> Unfortunately, GZIPOutputStream does not implement the write(byte) method.
> Instead, it only provides a write(byte[], offset, len) method, which calls
> the corresponding JNI zlib function. The write(byte) calls from
> DataOutputStream are translated into write(byte[], offset, len) calls in a
> very inefficient way: (Oracle JDK 1.8 code)
> {code}
> class DeflaterOutputStream {
> public void write(int b) throws IOException {
> byte[] buf = new byte[1];
> buf[0] = (byte)(b & 0xff);
> write(buf, 0, 1);
> }
> public void write(byte[] b, int off, int len) throws IOException {
> if (def.finished()) {
> throw new IOException("write beyond end of stream");
> }
> if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
> throw new IndexOutOfBoundsException();
> } else if (len == 0) {
> return;
> }
> if (!def.finished()) {
> def.setInput(b, off, len);
> while (!def.needsInput()) {
> deflate();
> }
> }
> }
> }
> class GZIPOutputStream extends DeflaterOutputStream {
> public synchronized void write(byte[] buf, int off, int len)
> throws IOException
> {
> super.write(buf, off, len);
> crc.update(buf, off, len);
> }
> }
> class Deflater {
> private native int deflateBytes(long addr, byte[] b, int off, int len, int
> flush);
> }
> class CRC32 {
> public void update(byte[] b, int off, int len) {
> if (b == null) {
> throw new NullPointerException();
> }
> if (off < 0 || len < 0 || off > b.length - len) {
> throw new ArrayIndexOutOfBoundsException();
> }
> crc = updateBytes(crc, b, off, len);
> }
> private native static int updateBytes(int crc, byte[] b, int off, int
> len);
> }
> {code}
> For each meta data byte, the code above has to allocate 1 single byte array,
> acquire several locks, call two native JNI methods (Deflater.deflateBytes and
> CRC32.updateBytes). In each Kafka message, there are about 30 some meta data
> bytes.
> The call stack of Deflater.deflateBytes():
> DeflaterOutputStream.public void write(int b) ->
> GZIPOutputStream.write(byte[] buf, int off, int len) ->
> DeflaterOutputStream.write(byte[] b, int off, int len) ->
> DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, int
> len) -> Deflater.deflate(byte[] b, int off, int len, int flush) ->
> Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush)
> The call stack of CRC32.updateBytes():
> DeflaterOutputStream.public void write(int b) ->
> GZIPOutputStream.write(byte[] buf, int off, int len) -> CRC32.update(byte[]
> b, int off, int len) -> CRC32.updateBytes(int crc, byte[] b, int off, int len)
> At Uber, we found that adding a small buffer between DataOutputStream and
> GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in
> average.
> {code}
> - return new DataOutputStream(new
> GZIPOutputStream(buffer, bufferSize));
> + return new DataOutputStream(new BufferedOutputStream(new
> GZIPOutputStream(buffer, bufferSize), 1 << 14));
> {code}
> The similar issue also exist in GZip decompression, which can be fixed by
> adding a buffer with BufferedInputStream.
> We have tested this improvement on Kafka 10.2 / Oracle JDK 8, with the
> production traffic at Uber:
> || Topic || Avg Message Size (bytes) || Vanilla Kafka Throughput (MB/s) ||
> Kafka /w GZip Buffer Throughput (MB/s) || Speed Up||
> | topic 1 | 197 | 10.9 | 21.9 | 2.0 |
> | topic 2 | 208 | 8.5 | 15.9 | 1.9 |
> | topic 3 | 624 | 15.3 | 20.2 | 1.3 |
> | topic 4 | 766 | 28.0 | 43.7 | 1.6 |
> | topic 5 | 1168 | 22.9 | 25.4 | 1.1 |
> | topic 6 | 165021 | 9.1 | 9.2 | 1.0 |
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)