[ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]
Guozhang Wang updated KAFKA-527:
--------------------------------

    Attachment: KAFKA-527.message-copy.history

Attached a file summarizing the copying operations. If we count the shallow 
iterating over message sets then there are about 8 copy operations throughout 
the life time of a message. Among them Copy 3/4/5 could be avoided, and we 
could only apply one shallow iteration on each broker (leader and follower) and 
consumer.

Given that the offset assignment is currently the main reason for converting 
between array of Message and ByteBufferMessageSet instead of appending Message 
to ByteBufferMessageSet, and that trimming invalid bytes on consumer 
complicates its logic, one proposal to refactoring the compression while still 
be backward compatible is:

1) On serving fetch request, only returns complete Messages within the request 
rate so the consumer would not need to trim invalid bytes.
2) Use the first 6 bits of the attribute byte of Message to indicate the number 
of real messages it wraps. This will limit the number of compressed messages in 
one wrapper to be 256, which I think is good enough.
3) When doing dedup, only delete the payload of the message but keep its 
counter so that the offsets are still consecutively incremental. 
4) For offset assignment on the broker, the ByteBufferMessageSet will have 
multiple wrapper Messages (since we have limit compress size to 256 one 
ProducerRequest now may have ByteBufferMessageSet of multiple wrapper 
messages); we need to use the shallowIterator to iterate them and reassign 
offsets without decompress/recompress.
5) On the consumer side, allow shallow iteration as well as deep iteration; 
with deep iteration, the consumer iterator needs to assign offset to each 
decompressed message based on its wrapper messages offset and size. Also add 
the compression codec to the MessageAndMetadata returned to the consumers, 
which will then solve KAFKA-1011.

With this we can completely remove the de/re-compression on brokers, and always 
use the compressed wrapper message unit throughout the pipeline. However cons 
of this approach is that it complicates logic on producer (256 compression 
batch limit) and consumer (re-compute message offsets).

> Compression support does numerous byte copies
> ---------------------------------------------
>
>                 Key: KAFKA-527
>                 URL: https://issues.apache.org/jira/browse/KAFKA-527
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jay Kreps
>         Attachments: java.hprof.no-compression.txt, java.hprof.snappy.text, 
> KAFKA-527.message-copy.history
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to