Yes, we can expose a user callback in MM, just like we did for rebalance
listener.
I still think ErrorLoggingCallback needs some change, though. Can we only
store the value bytes when logAsString is set to true? That looks more
reasonable to me.

Jiangjie (Becket) Qin

On 6/21/15, 3:02 AM, "tao xiao" <xiaotao...@gmail.com> wrote:

>Yes, I agree with that. It is even better if we can supply our own
>callback. For people who want to view the content of message when failure
>they still can do so
>
>On Sun, Jun 21, 2015 at 2:20 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hi Tao / Jiangjie,
>>
>> I think a better fix here may be not letting
>>MirrorMakerProducerCallback to
>> extend from ErrorLoggingCallback, but rather change the
>> ErrorLoggingCallback itself as it defeats the usage of logAsString,
>>which I
>> think is useful for a general error logging purposes. Rather we can
>> let MirrorMakerProducerCallback
>> to not take the value bytes itself but just the length if people agree
>>that
>> for MM we probably are not interested in its message value in callback.
>> Thoughts?
>>
>> Guozhang
>>
>> On Wed, Jun 17, 2015 at 1:06 AM, tao xiao <xiaotao...@gmail.com> wrote:
>>
>> > Thank you for the reply.
>> >
>> > Patch submitted https://issues.apache.org/jira/browse/KAFKA-2281
>> >
>> > On Mon, 15 Jun 2015 at 02:16 Jiangjie Qin <j...@linkedin.com.invalid>
>> > wrote:
>> >
>> > > Hi Tao,
>> > >
>> > > Yes, the issue that ErrorLoggingCallback keeps value as local
>>variable
>> is
>> > > known for a while and we probably should fix it as the value is not
>> used
>> > > except logging the its size. Can you open a ticket and maybe also
>> submit
>> > a
>> > > patch?
>> > >
>> > > For unreachable objects I donĀ¹t think it is memory leak. As you
>>said,
>> GC
>> > > should take care of this. In LinkedIn we are using G1GC with some
>> tunings
>> > > made by our SRE. You can try that if interested.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On 6/13/15, 11:39 AM, "tao xiao" <xiaotao...@gmail.com> wrote:
>> > >
>> > > >Hi,
>> > > >
>> > > >I am using mirror maker in trunk to replica data across two data
>> > centers.
>> > > >While the destination broker was having busy load and unresponsive
>>the
>> > > >send
>> > > >rate of mirror maker was very low and the available producer buffer
>> was
>> > > >quickly filled up. At the end mirror maker threw OOME. Detailed
>> > exception
>> > > >can be found here
>> > > >
>> > >
>> >
>> 
>>https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-excepti
>>o
>> > > >n-L1
>> > > >
>> > > >I started up mirror maker with 1G memory and 256M producer buffer.
>>I
>> > used
>> > > >eclipse MAT to analyze the heap dump and found out the retained
>>heap
>> > size
>> > > >of all RecordBatch objects were more than 500MB half of which were
>> used
>> > to
>> > > >retain data that were to send to destination broker which makes
>>sense
>> to
>> > > >me
>> > > >as it is close to 256MB producer buffer but the other half of which
>> were
>> > > >used by kafka.tools.MirrorMaker$MirrorMakerProducerCallback. As
>>every
>> > > >producer callback in mirror maker takes the message value and hold
>>it
>> > > >until
>> > > >the message is successfully delivered. In my case since the
>> destination
>> > > >broker was very unresponsive the message value held by callback
>>would
>> > stay
>> > > >forever which I think is a waste and it is a major contributor to
>>the
>> > OOME
>> > > >issue. screenshot of MAT
>> > > >
>> > >
>> >
>> 
>>https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-mat-screensh
>>o
>> > > >t-png
>> > > >
>> > > >The other interesting problem I observed is that when I turned on
>> > > >unreachable object parsing in MAT more than 400MB memory was
>>occupied
>> by
>> > > >unreachable objects. It surprised me that gc didn't clean them up
>> before
>> > > >OOME was thrown. As suggested in gc log
>> > > >
>> > >
>> >
>> 
>>https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-gc-log-
>>L
>> > > >1
>> > > >Full GC was unable to reclaim any memory and when facing OOME these
>> > > >unreachable objects should have been cleaned up. so either eclipse
>>MAT
>> > has
>> > > >issue parsing the heap dump or there is hidden memory leak that is
>> hard
>> > to
>> > > >find. I attached the sample screenshot of the unreachable objects
>>here
>> > > >
>> > >
>> >
>> 
>>https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-unreachable-
>>o
>> > > >bjects-png
>> > > >
>> > > >The consumer properties
>> > > >
>> > > >zookeeper.connect=zk
>> > > >zookeeper.connection.timeout.ms=1000000
>> > > >group.id=mm
>> > > >auto.offset.reset=smallest
>> > > >partition.assignment.strategy=roundrobin
>> > > >
>> > > >The producer properties
>> > > >
>> > > >bootstrap.servers=brokers
>> > > >client.id=mirror-producer
>> > > >producer.type=async
>> > > >compression.codec=none
>> > > >serializer.class=kafka.serializer.DefaultEncoder
>> > >
>> 
>>>key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
>> > >
>> >
>> 
>>>value.serializer=org.apache.kafka.common.serialization.ByteArraySerializ
>>>er
>> > > >buffer.memory=268435456
>> > > >batch.size=1048576
>> > > >max.request.size=5242880
>> > > >send.buffer.bytes=1048576
>> > > >
>> > > >The java command to start mirror maker
>> > > >java -Xmx1024M -Xms512M -XX:+HeapDumpOnOutOfMemoryError
>> > > >-XX:HeapDumpPath=/home/kafka/slc-phx-mm-cg.hprof
>> > > >-XX:+PrintTenuringDistribution -XX:MaxTenuringThreshold=3 -server
>> > > >-XX:+UseParNewGC -XX:+UseConcMarkSweepGC
>>-XX:+CMSClassUnloadingEnabled
>> > > >-XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC
>> > > >-Djava.awt.headless=true
>> > > >-Xloggc:/var/log/kafka/kafka-phx/cg/mirrormaker-gc.log -verbose:gc
>> > > >-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
>> > > >-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
>> > > >-XX:GCLogFileSize=10M -Dcom.sun.management.jmxremote
>> > > >-Dcom.sun.management.jmxremote.authenticate=false
>> > > >-Dcom.sun.management.jmxremote.ssl=false
>> > > >-Dkafka.logs.dir=/var/log/kafka/kafka-phx/cg
>> > >
>> >
>> 
>>>-Dlog4j.configuration=file:/usr/share/kafka/bin/../config/tools-log4j.pr
>>>op
>> > > >erties
>> > > >-cp libs/* kafka.tools.MirrorMaker --consumer.config
>> > > >consumer.properties --num.streams 10 --producer.config
>> > > >producer.properties --whitelist test.*
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>

Reply via email to