[ 
https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13747631#comment-13747631
 ] 

Guozhang Wang commented on KAFKA-1011:
--------------------------------------

Proposed Approach:

1. Since the compression function ByteBufferMessageSet.create will only be 
called over a set of messages either with the same key or with key null, we can 
write the key to the compressed wrapper message according to their keys 
(currently it is always written as null).

2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing 
the parameter from KafkaStream to consumerIterator; in consumerIterator, if 
isShallow is true call currentDataChunk.messages.shallowIterator otherwise call 
currentDataChunk.messages.iterator

3. Also in consumerIterator, if shallowIterator is true, construct 
MessageAndMetadata with value directly assigned as message: Message instead of 
fromBytes(Utils.readBytes(item.message.payload))

4. In MirrorMaker, set shallowIterator to true, and upon read each 
msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead 
of  KeyedMessage[Array[Byte], Array[Byte]].

5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second 
compression of compressed message.

6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker 
producer's event handler would use the message key to decide the outgoing 
partition, hence compressed messages with the same key would go to the same 
partition.
                
> Decompression and re-compression on MirrorMaker could result in messages 
> being dropped in the pipeline
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1011
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1011
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Guozhang Wang
>             Fix For: 0.8.1
>
>
> The way MirrorMaker works today is that its consumers could use deep iterator 
> to decompress messages received from the source brokers and its producers 
> could re-compress the messages while sending them to the target brokers. 
> Since MirrorMakers use a centralized data channel for its consumers to pipe 
> messages to its producers, and since producers would compress messages with 
> the same topic within a batch as a single produce request, this could result 
> in messages accepted at the front end of the pipeline being dropped at the 
> target brokers of the MirrorMaker due to MesageSizeTooLargeException if it 
> happens that one batch of messages contain too many messages of the same 
> topic in MirrorMaker's producer. If we can use shallow iterator at the 
> MirrorMaker's consumer side to directly pipe compressed messages this issue 
> can be fixed. 
> Also as Swapnil pointed out, currently if the MirrorMaker lags and there are 
> large messages in the MirrorMaker queue (large after decompression), it can 
> run into an OutOfMemoryException. Shallow iteration will be very helpful in 
> avoiding this exception.
> The proposed solution of this issue is also related to KAFKA-527.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to