[ 
https://issues.apache.org/jira/browse/KAFKA-732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13580942#comment-13580942
 ] 

Jun Rao commented on KAFKA-732:
-------------------------------

I seems that it's non-trivial to support shallow iteration in 0.8. The main 
reason is that the encoder api is changed to encode(event: T) => byte[], from 
encode(event: T) => Message. So, in 0.7, we can get a compressed message from 
the source and simply pass it to the producer in mirrorMaker. In 0.8, there is 
no easy way that we can do that.

So, I suggest that we remove the shallowIteration option in ConsumerConfig and 
revisit this issue post 0.8.
                
> MirrorMaker with shallow.iterator.enable=true produces unreadble messages
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-732
>                 URL: https://issues.apache.org/jira/browse/KAFKA-732
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, producer 
>    Affects Versions: 0.8
>            Reporter: Maxime Brugidou
>            Assignee: Neha Narkhede
>            Priority: Blocker
>              Labels: p2
>
> Trying to use MirrorMaker between two 0.8 clusters
> When using shallow.iterator.enable=true on the consumer side, the performance 
> gain is big (when incoming messages are compressed) and the producer does not 
> complain but write the messages uncompressed without the compression flag.
> If you try:
> - enable compression on the producer, it obviously makes things worse since 
> the data get double-compressed (the wiki warns about this)
> - disable compression and the compressed messages are written in bulk in an 
> uncompressed message, thus making it unreadable.
> If I follow correctly the current state of code from MirrorMaker to the 
> produce request, there is no way for the producer to know whether the message 
> is deep or not. So I wonder how it worked on 0.7?
> Here is the code as i read it (correct me if i'm wrong):
> 1. MirrorMakerThread.run(): create 
> KeyedMessage[Array[Byte],Array[Byte]](topic, message)
> 2. Producer.send() -> DefaultEventHandler.handle()
> 3. DefaultEventHandler.serialize(): use DefaultEncoder for the message (does 
> nothing)
> 4. DefaultEventHandler.dispatchSerializedData():
> 4.1 DefaultEventHandler.partitionAndCollate(): group messages by 
> broker/partition/topic
> 4.2 DefaultEventHandler.dispatchSerializeData(): cycle through each broker
> 4.3 DefaultEventHandler.groupMessagesToSet(): Create a ByteBufferMessageSet 
> for each partition/topic grouping all the messages together, and compressing 
> them if needed
> 4.4 DefaultEventHandler.send(): send the ByteBufferMessageSets for this 
> broker in one ProduceRequest
> The gist is that in DEH.groupMessagesToSet(), you don't know wether the raw 
> message in KeyedMessage.message is shallow or not. So I think I missed 
> something... Also it doesn't seem possible to send batch of deep messages in 
> one ProduceRequest.
> I would love to provide a patch (or if you tell me that i'm doing it wrong, 
> it's even better), since I can easily test it on my test clusters but I will 
> need guidance here.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to