Hi Kishore Senji, The size of segement file is default 1GB.
According to the LogManager.scala#cleanupExpiredSegments, Kafka will only delete segments whose lastModTime is older than retention.ms, so I dont think this is the reason for my data loss. Actually I lost some data in topic other than the topic I reduced the retention... I dont know whether destage these several GB files will cause this kind of system chattering, though we do use not very fancy hardwares. 2015-08-18 7:48 GMT+08:00 Kishore Senji <kse...@gmail.com>: > What is the size of the segment file? You are reducing the retention from > 10 days to 1 day. The moment you do this, it will delete all segments which > are older than 1 day. So for example, if your latest segment is older than > 1 day and if there are consumers which are still catching up (let us say 10 > min lag), Kafka will roll over and delete the older segments and there is > potential for data loss. One pattern could be to make sure you change this > config parameter only when a new segment is created and all consumers are > on the new segment and also make sure all clients will be done with the > segment before the file is deleted. > > My guess is that your segment file is huge and the OS may be taking a long > time to destage the file cache on to disk before letting it to be deleted. > This could be the reason for long pause which could be causing the Zk > connections to be timed out. > > > > On Mon, Aug 17, 2015 at 6:59 AM Zhao Weinan <zhaow...@gmail.com> wrote: > > > Hi Kishore Senji, > > > > Thanks for the reply. > > > > Do you have some suggestions before the fix came up? Try not to modify > the > > retention.ms? Or disable the auto rebalance? Cause this problem is 100% > > reproduceable in my scenario (two times got dead lock in two > retention.ms > > modification), and I even found some data loss which I'm still looking > for > > the reason. > > > > Any idea on why shrinking the retention.ms causes the network unstable? > > > > And yes I use the comma for clarity :) > > > > 2015-08-17 8:59 GMT+08:00 Kishore Senji <kse...@gmail.com>: > > > > > Interesting problem you ran in to. It seems like this broker was chosen > > as > > > the Controller and onControllerFailure() method was called. This will > > > schedule the checkAndTriggerPartitionRebalance method to execute after > 5 > > > seconds (when auto rebalance enabled). In the mean time this broker > lost > > > zookeeper connection and so this broker resigns from the Controller > > status > > > and so the onControllerResignation() method is called and this method > > will > > > try to shutdown the auto rebalance executor. But it is doing it by > > holding > > > the lock and this is what caused the dead lock in your case. > > > > > > I do not think we need to hold the lock to shutdown the executor. This > > > could be the fix we might need. > > > > > > retention.ms config parameter should not have commas in the value. Are > > you > > > just using it here to clarify for us. > > > > > > It so happened in your > > > On Sun, Aug 16, 2015 at 1:52 AM Zhao Weinan <zhaow...@gmail.com> > wrote: > > > > > > > Hi guys, > > > > > > > > I got this problem, after changing one topic's config to > > > > retention.ms=86,400,000 > > > > from 864,000,000, the brokers start to shedule and do deletions of > > > outdated > > > > index of that topic. > > > > > > > > Then for some reason some brokers' connection with zookeeper were > > > expired, > > > > suddenly lots of ERRORs showed up in logs/server.log: At controller > > > > broker(id=5) are: > > > > > > > > > *ERROR [ReplicaFetcherThread-2-4], Error for partition [XXXXX,4] to > > > > broker > > > > > 4:class kafka.common.NotLeaderForPartitionException > > > > > (kafka.server.ReplicaFetcherThread).* > > > > > > > > > At other broker which the controller broker try to fetch are: > > > > > > > > > *[Replica Manager on Broker 4]: Fetch request with correlation id > > > 1920630 > > > > > from client ReplicaFetcherThread-2-4 on partition [XXXXX,4] failed > > due > > > to > > > > > Leader not local for partition [XXXXXXX,4] on broker 4 > > > > > (kafka.server.ReplicaManager).* > > > > > > > > > > > > > In controller broker's server.log there are zk reconnections: > > > > > > > > > *INFO Client session timed out, have not heard from server in > 5126ms > > > for > > > > > sessionid 0x54e0aaa9582b8e4, closing socket connection and > attempting > > > > > reconnect (org.apache.zookeeper.ClientCnxn)* > > > > > *INFO zookeeper state changed (Disconnected) > > > > > (org.I0Itec.zkclient.ZkClient)* > > > > > *NFO Opening socket connection to server xxxxxxxxxxx. Will not > > attempt > > > to > > > > > authenticate using SASL (java.lang.SecurityException: Unable to > > locate > > > a > > > > > login configuration) (org.apache.zookeeper.ClientCnxn)* > > > > > *INFO Socket connection established to xxxxxxxxxxxxx, initiating > > > session > > > > > (org.apache.zookeeper.ClientCnxn)* > > > > > *INFO Session establishment complete on server xxxxxxxxxx, > sessionid > > = > > > > > 0x54e0aaa9582b8e4, negotiated timeout = 6000 > > > > > (org.apache.zookeeper.ClientCnxn)* > > > > > *INFO zookeeper state changed (SyncConnected) > > > > > (org.I0Itec.zkclient.ZkClient)* > > > > > > > > > But on zookeeper /brokers/ids/ there is no controller broker's id 5. > > > > > > > > Then I tried to restart the controller broker, found the process > won't > > > > quit. > > > > > > > > Then I jstacked it, found the broker process kind of stucked, some > > > > keypoints pasted as below. It seems zk-client-expired-callback > aquired > > > the > > > > controllerLock and wait the kafka-scheduler Executor to exit (for one > > > day), > > > > but some thread in that Executor is doing Rebalance job which need to > > > > aquire the controllerLock, then the broker is in DEAD LOCK and will > be > > > > totally lost from zookeeper for ONE DAY if I'm corrected? And since > > it's > > > > still hold outdated view of the cluster, it will try to try to > follower > > > up > > > > the Leaders which maybe not actual Leader, caused the ERRORs as above > > > > mentioned. > > > > > > > > I'm using 8 Kafka brokers @0.8.2.1 with 3 Zookeeper server @3.4.6, > all > > > on > > > > different host in same data center, the cluster load is about 200K > > > messages > > > > in and 30M bytes in and 80M bytes out totally. > > > > > > > > Does some one has the same issue? Any suggestion is appreciated. > > > > > > > > > > > > jstack: > > > > > > > > > "kafka-scheduler-0" daemon prio=10 tid=0x0000000057967800 > nid=0x2994 > > > > > waiting on condition [0x0000000046dac000] > > > > > java.lang.Thread.State: WAITING (parking) > > > > > at sun.misc.Unsafe.park(Native Method) > > > > > - parking to wait for <0x00000000c2ec6418> (a > > > > > java.util.concurrent.locks.ReentrantLock$NonfairSync) > > > > > at > > > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > > > > > at > > > > > > > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > > > > > at > > > > > > > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) > > > > > at > > > > > > > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) > > > > > at > > > > > > > > > > > > > > > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) > > > > > at > > > > > > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) > > > > > at kafka.utils.Utils$.inLock(Utils.scala:533) > > > > > at > > > > > > > > > > > > > > > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1131) > > > > > at > > > > > > > > > > > > > > > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1127) > > > > > at > > > > > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) > > > > > at > > > > > > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) > > > > > at > > > > > > > > > > > > > > > kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1127) > > > > > at > > > > > > > > > > > > > > > kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:326) > > > > > at > > > > > > > > > > > > > > > kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99) > > > > > at kafka.utils.Utils$$anon$1.run(Utils.scala:54) > > > > > at > > > > > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > > > > > at > > > > > > > > > > > > > > > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > > > > > at > > > > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > > > > > at > > > > > > > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > > > > > at > > > > > > > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) > > > > > at > > > > > > > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) > > > > > at > > > > > > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > > > > > at > > > > > > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > > > > > at java.lang.Thread.run(Thread.java:662) > > > > > > > > > "ZkClient-EventThread-15-xxxxxxxxxxxxxxxxxxxxx" daemon prio=10 > > > > > tid=0x00002aaab42ba800 nid=0x24c4 waiting on condition [ > > > > > 0x00000000402e8000] > > > > > java.lang.Thread.State: TIMED_WAITING (parking) > > > > > at sun.misc.Unsafe.park(Native Method) > > > > > - parking to wait for <0x00000000f4c450b8> (a > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > > > at > > > > > > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) > > > > > at > > > > > > > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) > > > > > at > > > > > > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1253) > > > > > at > > kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88) > > > > > at > > > > > > > > > > > > > > > kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350) > > > > > at > > > > > > > > > > > > > > > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1108) > > > > > at > > > > > > > > > > > > > > > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107) > > > > > at > > > > > > > > > > > > > > > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107) > > > > > at kafka.utils.Utils$.inLock(Utils.scala:535) > > > > > at > > > > > > > > > > > > > > > kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1107) > > > > > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) > > > > > at > > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > > > > > > > > > > > > > > >