Hi, as far as I know, rebalance is triggered from Kafka in order to distribute partitions evenly. That is, to achieve the opposite of what you are seeing. I think it would be interesting to check the Kafka logs for the result of the rebalance operation and why you see what you are seeing. I know that in the client logs it says which partitions of a topic were assigned to this particular consumer, maybe you can have a look.
Tobias On Fri, Jul 18, 2014 at 11:42 PM, Chen Song <chen.song...@gmail.com> wrote: > Speaking of this, I have another related question. > > In my spark streaming job, I set up multiple consumers to receive data > from Kafka, with each worker from one partition. > > Initially, Spark is intelligent enough to associate each worker to each > partition, to make data consumption distributed. After running for a while, > consumers rebalance themselves and some workers start reading partitions > which were with others. This leads to a situation that some worker read > from multiple partitions and some don't read at all. Because of data > volume, this causes heap pressure on some workers. > > Any thoughts on why rebalance is triggered and how to monitor to avoid > that? > > > > > On Fri, Jul 4, 2014 at 11:11 AM, Tobias Pfeiffer <t...@preferred.jp> wrote: > >> Hi, >> >> unfortunately, when I go the above approach, I run into this problem: >> >> http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3ccabtfevyxvtaqvnmvwmh7yscfgxpw5kmrnw_gnq72cy4oa1b...@mail.gmail.com%3E >> That is, a NoNode error in Zookeeper when rebalancing. The Kafka >> receiver will retry again and again, but will eventually fail, leading to >> unprocessed data and, worse, the task never terminating. There is nothing >> exotic about my setup; one Zookeeper node, one Kafka broker, so I am >> wondering if other people have seen this error before and, more important, >> how to fix it. When I don't use the approach of multiple kafkaStreams, I >> don't get this error, but also work is never distributed in my cluster... >> >> Thanks >> Tobias >> >> >> On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer <t...@preferred.jp> >> wrote: >> >>> Thank you very much for the link, that was very helpful! >>> >>> So, apparently the `topics: Map[String, Int]` parameter controls the >>> number of partitions that the data is initially added to; the number N in >>> >>> val kafkaInputs = (1 to N).map { _ => >>> ssc.kafkaStream(zkQuorum, groupId, Map("topic" -> 1)) >>> } >>> val union = ssc.union(kafkaInputs) >>> >>> controls how many connections are made to Kafka. Note that the number of >>> Kafka partitions for that topic must be at least N for this to work. >>> >>> Thanks >>> Tobias >>> >> >> > > > -- > Chen Song > >