I missed the following lines:

topicStreamsMap.foreach { topicAndStreams =>
      // register on broker partition path changes
      val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
      zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
}

My curiosity was resolved.

On Fri, Dec 14, 2012 at 10:20 PM, Bae, Jae Hyeon <metac...@gmail.com> wrote:
> Hi
>
> When one of the broker instances is dead, I can see the producer can
> acknowledge the dead broker and refresh its producer pool not to send
> the data to the dead broker. But I don't find any code in
> ZookeeperConsumerConnector. As I guess, ZookeeprConsumerConnector
> should do syncRebalance when kafka broker is dead or new kafka broker
> is registered.
>
> Can you give me a guideline how ZookeeperConsumerConnector behaves
> with broker interruption?
>
> Thank you
> Best, Jae

Reply via email to