I missed the following lines: topicStreamsMap.foreach { topicAndStreams => // register on broker partition path changes val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1 zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) }
My curiosity was resolved. On Fri, Dec 14, 2012 at 10:20 PM, Bae, Jae Hyeon <metac...@gmail.com> wrote: > Hi > > When one of the broker instances is dead, I can see the producer can > acknowledge the dead broker and refresh its producer pool not to send > the data to the dead broker. But I don't find any code in > ZookeeperConsumerConnector. As I guess, ZookeeprConsumerConnector > should do syncRebalance when kafka broker is dead or new kafka broker > is registered. > > Can you give me a guideline how ZookeeperConsumerConnector behaves > with broker interruption? > > Thank you > Best, Jae