Hi Kishore Senji,

The size of segement file is default 1GB.

According to the LogManager.scala#cleanupExpiredSegments, Kafka will only
delete segments whose lastModTime is older than retention.ms, so I dont
think this is the reason for my data loss. Actually I lost some data in
topic other than the topic I reduced the retention...

I dont know whether destage these several GB files will cause this kind of
system chattering, though we do use not very fancy hardwares.

2015-08-18 7:48 GMT+08:00 Kishore Senji <kse...@gmail.com>:

> What is the size of the segment file? You are reducing the retention from
> 10 days to 1 day. The moment you do this, it will delete all segments which
> are older than 1 day. So for example, if your latest segment is older than
> 1 day and if there are consumers which are still catching up (let us say 10
> min lag), Kafka will roll over and delete the older segments and there is
> potential for data loss. One pattern could be to make sure you change this
> config parameter only when a new segment is created and all consumers are
> on the new segment and also make sure all clients will be done with the
> segment before the file is deleted.
>
> My guess is that your segment file is huge and the OS may be taking a long
> time to destage the file cache on to disk before letting it to be deleted.
> This could be the reason for long pause which could be causing the Zk
> connections to be timed out.
>
>
>
> On Mon, Aug 17, 2015 at 6:59 AM Zhao Weinan <zhaow...@gmail.com> wrote:
>
> > Hi Kishore Senji,
> >
> > Thanks for the reply.
> >
> > Do you have some suggestions before the fix came up? Try not to modify
> the
> > retention.ms? Or disable the auto rebalance? Cause this problem is 100%
> > reproduceable in my scenario (two times got dead lock in two
> retention.ms
> > modification), and I even found some data loss which I'm still looking
> for
> > the reason.
> >
> > Any idea on why shrinking the retention.ms causes the network unstable?
> >
> > And yes I use the comma for clarity :)
> >
> > 2015-08-17 8:59 GMT+08:00 Kishore Senji <kse...@gmail.com>:
> >
> > > Interesting problem you ran in to. It seems like this broker was chosen
> > as
> > > the Controller and onControllerFailure() method was called. This will
> > > schedule the checkAndTriggerPartitionRebalance method to execute after
> 5
> > > seconds (when auto rebalance enabled). In the mean time this broker
> lost
> > > zookeeper connection and so this broker resigns from the Controller
> > status
> > > and so the onControllerResignation() method is called and this method
> > will
> > > try to shutdown the auto rebalance executor.  But it is doing it by
> > holding
> > > the lock and this is what caused the dead lock in your case.
> > >
> > > I do not think we need to hold the lock to shutdown the executor. This
> > > could be the fix we might need.
> > >
> > > retention.ms config parameter should not have commas in the value. Are
> > you
> > > just using it here to clarify for us.
> > >
> > > It so happened in your
> > > On Sun, Aug 16, 2015 at 1:52 AM Zhao Weinan <zhaow...@gmail.com>
> wrote:
> > >
> > > > Hi guys,
> > > >
> > > > I got this problem, after changing one topic's config to
> > > > retention.ms=86,400,000
> > > > from 864,000,000, the brokers start to shedule and do deletions of
> > > outdated
> > > > index of that topic.
> > > >
> > > > Then for some reason some brokers' connection with zookeeper were
> > > expired,
> > > > suddenly lots of ERRORs showed up in logs/server.log: At controller
> > > > broker(id=5) are:
> > > >
> > > > > *ERROR [ReplicaFetcherThread-2-4], Error for partition [XXXXX,4] to
> > > > broker
> > > > > 4:class kafka.common.NotLeaderForPartitionException
> > > > > (kafka.server.ReplicaFetcherThread).*
> > > > >
> > > > At other broker which the controller broker try to fetch are:
> > > >
> > > > > *[Replica Manager on Broker 4]: Fetch request with correlation id
> > > 1920630
> > > > > from client ReplicaFetcherThread-2-4 on partition [XXXXX,4] failed
> > due
> > > to
> > > > > Leader not local for partition [XXXXXXX,4] on broker 4
> > > > > (kafka.server.ReplicaManager).*
> > > > >
> > > >
> > > > In controller broker's server.log there are zk reconnections:
> > > >
> > > > > *INFO Client session timed out, have not heard from server in
> 5126ms
> > > for
> > > > > sessionid 0x54e0aaa9582b8e4, closing socket connection and
> attempting
> > > > > reconnect (org.apache.zookeeper.ClientCnxn)*
> > > > > *INFO zookeeper state changed (Disconnected)
> > > > > (org.I0Itec.zkclient.ZkClient)*
> > > > > *NFO Opening socket connection to server xxxxxxxxxxx. Will not
> > attempt
> > > to
> > > > > authenticate using SASL (java.lang.SecurityException: Unable to
> > locate
> > > a
> > > > > login configuration) (org.apache.zookeeper.ClientCnxn)*
> > > > > *INFO Socket connection established to xxxxxxxxxxxxx, initiating
> > > session
> > > > > (org.apache.zookeeper.ClientCnxn)*
> > > > > *INFO Session establishment complete on server xxxxxxxxxx,
> sessionid
> > =
> > > > > 0x54e0aaa9582b8e4, negotiated timeout = 6000
> > > > > (org.apache.zookeeper.ClientCnxn)*
> > > > > *INFO zookeeper state changed (SyncConnected)
> > > > > (org.I0Itec.zkclient.ZkClient)*
> > > > >
> > > > But on zookeeper /brokers/ids/ there is no controller broker's id 5.
> > > >
> > > > Then I tried to restart the controller broker, found the process
> won't
> > > > quit.
> > > >
> > > > Then I jstacked it, found the broker process kind of stucked, some
> > > > keypoints pasted as below. It seems zk-client-expired-callback
> aquired
> > > the
> > > > controllerLock and wait the kafka-scheduler Executor to exit (for one
> > > day),
> > > > but some thread in that Executor is doing Rebalance job which need to
> > > > aquire the controllerLock, then the broker is in DEAD LOCK and will
> be
> > > > totally lost from zookeeper for ONE DAY if I'm corrected? And since
> > it's
> > > > still hold outdated view of the cluster, it will try to try to
> follower
> > > up
> > > > the Leaders which maybe not actual Leader, caused the ERRORs as above
> > > > mentioned.
> > > >
> > > > I'm using 8 Kafka brokers @0.8.2.1 with 3 Zookeeper server @3.4.6,
> all
> > > on
> > > > different host in same data center, the cluster load is about 200K
> > > messages
> > > > in and 30M bytes in and 80M bytes out totally.
> > > >
> > > > Does some one has the same issue? Any suggestion is appreciated.
> > > >
> > > >
> > > > jstack:
> > > >
> > > > > "kafka-scheduler-0" daemon prio=10 tid=0x0000000057967800
> nid=0x2994
> > > > > waiting on condition [0x0000000046dac000]
> > > > >    java.lang.Thread.State: WAITING (parking)
> > > > >         at sun.misc.Unsafe.park(Native Method)
> > > > >         - parking to wait for  <0x00000000c2ec6418> (a
> > > > > java.util.concurrent.locks.ReentrantLock$NonfairSync)
> > > > >         at
> > > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
> > > > >         at
> > > > >
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> > > > >         at kafka.utils.Utils$.inLock(Utils.scala:533)
> > > > >         at
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1131)
> > > > >         at
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1127)
> > > > >         at
> > > > >
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > > > >         at
> > > > >
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > >         at
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1127)
> > > > >         at
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:326)
> > > > >         at
> > > > >
> > > >
> > >
> >
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99)
> > > > >         at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
> > > > >         at
> > > > >
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> > > > >         at
> > > > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > > > >         at java.lang.Thread.run(Thread.java:662)
> > > > >
> > > > "ZkClient-EventThread-15-xxxxxxxxxxxxxxxxxxxxx" daemon prio=10
> > > > > tid=0x00002aaab42ba800 nid=0x24c4 waiting on condition [
> > > > > 0x00000000402e8000]
> > > > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > > > >         at sun.misc.Unsafe.park(Native Method)
> > > > >         - parking to wait for  <0x00000000f4c450b8> (a
> > > > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > > >         at
> > > > >
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1253)
> > > > >         at
> > kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88)
> > > > >         at
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
> > > > >         at
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1108)
> > > > >         at
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
> > > > >         at
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
> > > > >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > > > >         at
> > > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1107)
> > > > >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> > > > >         at
> > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > > > >
> > > >
> > >
> >
>

Reply via email to