Hi,

as far as I know, rebalance is triggered from Kafka in order to distribute
partitions evenly. That is, to achieve the opposite of what you are seeing.
I think it would be interesting to check the Kafka logs for the result of
the rebalance operation and why you see what you are seeing. I know that in
the client logs it says which partitions of a topic were assigned to this
particular consumer, maybe you can have a look.

Tobias


On Fri, Jul 18, 2014 at 11:42 PM, Chen Song <chen.song...@gmail.com> wrote:

> Speaking of this, I have another related question.
>
> In my spark streaming job, I set up multiple consumers to receive data
> from Kafka, with each worker from one partition.
>
> Initially, Spark is intelligent enough to associate each worker to each
> partition, to make data consumption distributed. After running for a while,
> consumers rebalance themselves and some workers start reading partitions
> which were with others. This leads to a situation that some worker read
> from multiple partitions and some don't read at all. Because of data
> volume, this causes heap pressure on some workers.
>
> Any thoughts on why rebalance is triggered and how to monitor to avoid
> that?
>
>
>
>
> On Fri, Jul 4, 2014 at 11:11 AM, Tobias Pfeiffer <t...@preferred.jp> wrote:
>
>> Hi,
>>
>> unfortunately, when I go the above approach, I run into this problem:
>>
>> http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3ccabtfevyxvtaqvnmvwmh7yscfgxpw5kmrnw_gnq72cy4oa1b...@mail.gmail.com%3E
>> That is, a NoNode error in Zookeeper when rebalancing. The Kafka
>> receiver will retry again and again, but will eventually fail, leading to
>> unprocessed data and, worse, the task never terminating. There is nothing
>> exotic about my setup; one Zookeeper node, one Kafka broker, so I am
>> wondering if other people have seen this error before and, more important,
>> how to fix it. When I don't use the approach of multiple kafkaStreams, I
>> don't get this error, but also work is never distributed in my cluster...
>>
>> Thanks
>> Tobias
>>
>>
>> On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer <t...@preferred.jp>
>> wrote:
>>
>>> Thank you very much for the link, that was very helpful!
>>>
>>> So, apparently the `topics: Map[String, Int]` parameter controls the
>>> number of partitions that the data is initially added to; the number N in
>>>
>>>   val kafkaInputs = (1 to N).map { _ =>
>>>     ssc.kafkaStream(zkQuorum, groupId, Map("topic" -> 1))
>>>   }
>>>   val union = ssc.union(kafkaInputs)
>>>
>>> controls how many connections are made to Kafka. Note that the number of
>>> Kafka partitions for that topic must be at least N for this to work.
>>>
>>> Thanks
>>> Tobias
>>>
>>
>>
>
>
> --
> Chen Song
>
>

Reply via email to