Is this always the case that there is only one fetcher per broker, won’t 
setting num.replica.fetchers greater than number-of-brokers cause more fetchers 
per broker? 
Let’s I have 5 brokers, and num of replica fetchers is 8, will there be 2 
fetcher threads pulling from  each broker?

Thanks
Zakee



> On Mar 12, 2015, at 11:15 AM, James Cheng <jch...@tivo.com> wrote:
> 
> Ah, I understand now. I didn't realize that there was one fetcher thread per 
> broker. 
> 
> Thanks Tao & Guozhang!
> -James
> 
> 
> On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao...@gmail.com 
> <mailto:xiaotao...@gmail.com>> wrote:
> 
>> Fetcher thread is per broker basis, it ensures that at lease one fetcher
>> thread per broker. Fetcher thread is sent to broker with a fetch request to
>> ask for all partitions. So if A, B, C are in the same broker fetcher thread
>> is still able to fetch data from A, B, C even though A returns no data.
>> same logic is applied to different broker.
>> 
>> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jch...@tivo.com> wrote:
>> 
>>> 
>>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>>> 
>>>> Hi James,
>>>> 
>>>> What I meant before is that a single fetcher may be responsible for
>>> putting
>>>> fetched data to multiple queues according to the construction of the
>>>> streams setup, where each queue may be consumed by a different thread.
>>> And
>>>> the queues are actually bounded. Now say if there are two queues that are
>>>> getting data from the same fetcher F, and are consumed by two different
>>>> user threads A and B. If thread A for some reason got slowed / hung
>>>> consuming data from queue 1, then queue 1 will eventually get full, and F
>>>> trying to put more data to it will be blocked. Since F is parked on
>>> trying
>>>> to put data to queue 1, queue 2 will not get more data from it, and
>>> thread
>>>> B may hence gets starved. Does that make sense now?
>>>> 
>>> 
>>> Yes, that makes sense. That is the scenario where one thread of a consumer
>>> can cause a backup in the queue, which would cause other threads to not
>>> receive data.
>>> 
>>> What about the situation I described, where a thread consumes a queue that
>>> is supposed to be filled with messages from multiple partitions? If
>>> partition A has no messages and partitions B and C do, how will the fetcher
>>> behave? Will the processing thread receive messages from partitions B and C?
>>> 
>>> Thanks,
>>> -James
>>> 
>>> 
>>>> Guozhang
>>>> 
>>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com> wrote:
>>>> 
>>>>> Hi,
>>>>> 
>>>>> Sorry to bring up this old thread, but my question is about this exact
>>>>> thing:
>>>>> 
>>>>> Guozhang, you said:
>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>>> BC: 6
>>>>>> partitions.
>>>>>> 
>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
>>> will
>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>> respectively;
>>>>>> 
>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will
>>> be
>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
>>>>>> respectively.
>>>>> 
>>>>> 
>>>>> You said that in the createMessageStreamsByFilter case, if topic AC had
>>> no
>>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads might
>>> all
>>>>> be blocked waiting for data to arrive from topic AC, and so messages
>>> from
>>>>> BC would not be processed.
>>>>> 
>>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
>>>>> same problem but just worse. Behind the scenes, is there a single thread
>>>>> that is consuming (round-robin?) messages from the different partitions
>>> and
>>>>> inserting them all into a single queue for the application code to
>>> process?
>>>>> And that is why a single partition with no messages with block the other
>>>>> messages from getting through?
>>>>> 
>>>>> What about createMessageStreams("AC" => 1)? That creates a single stream
>>>>> that contains messages from multiple partitions, which might be on
>>>>> different brokers. Does that also suffer the same problem, where if one
>>>>> partition has no messages, that the application would not receive
>>> messages
>>>>> from the other paritions?
>>>>> 
>>>>> Thanks,
>>>>> -James
>>>>> 
>>>>> 
>>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>>>>> 
>>>>>> The new consumer will be released in 0.9, which is targeted for end of
>>>>> this
>>>>>> quarter.
>>>>>> 
>>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com>
>>> wrote:
>>>>>> 
>>>>>>> Do you know when the new consumer API will be publicly available?
>>>>>>> 
>>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by two
>>>>>>>> different processes and AC processors gets stuck, hence AC messages
>>>>> will
>>>>>>>> fill up in the consumer's buffer and eventually prevents the fetcher
>>>>>>> thread
>>>>>>>> to put more data into it; the fetcher thread will be blocked on that
>>>>> and
>>>>>>>> not be able to fetch BC.
>>>>>>>> 
>>>>>>>> This issue has been addressed in the new consumer client, which is
>>>>>>>> single-threaded with non-blocking APIs.
>>>>>>>> 
>>>>>>>> Guozhang
>>>>>>>> 
>>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Thank you Guozhang for your detailed explanation. In your example
>>>>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
>>>>> among
>>>>>>>>> topics there may be situation where all 3 threads threads get stuck
>>>>>>> with
>>>>>>>>> topic AC e.g. topic is empty which will be holding the connecting
>>>>>>> threads
>>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
>>>>>>> topic
>>>>>>>>> BC. do you think this situation will happen?
>>>>>>>>> 
>>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter each
>>>>>>> matched
>>>>>>>>>> topic will have num-threads, but shared: i.e. there will be totally
>>>>>>>>>> num-threads created, but each thread will be responsible for
>>> fetching
>>>>>>>> all
>>>>>>>>>> matched topics.
>>>>>>>>>> 
>>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>>>>>>>> BC: 6
>>>>>>>>>> partitions.
>>>>>>>>>> 
>>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
>>> threads
>>>>>>>> will
>>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>>>>>>> respectively;
>>>>>>>>>> 
>>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
>>>>>>> will
>>>>>>>> be
>>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
>>> AC-3/BC-5/BC-6
>>>>>>>>>> respectively.
>>>>>>>>>> 
>>>>>>>>>> Guozhang
>>>>>>>>>> 
>>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Guozhang,
>>>>>>>>>>> 
>>>>>>>>>>> Do you mean that each regex matched topic owns number of threads
>>>>>>> that
>>>>>>>>> get
>>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in below
>>>>>>> code
>>>>>>>>> If
>>>>>>>>>> I
>>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I should
>>>>>>>> have
>>>>>>>>>> 3 *
>>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
>>>>>>>>>>> 
>>>>>>>>>>> TopicFilter filter = new Whitelist(".*");
>>>>>>>>>>> 
>>>>>>>>>>> int threadTotal = 2;
>>>>>>>>>>> 
>>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
>>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> But what I observed from the log is different
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>>> following
>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
>>>>>>> consumers:
>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>>>>>>> partition 1
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>>>>> partition 0
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>>> following
>>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
>>>>>>>>>>> partitions consumed by consumer thread
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
>>>>>>> kafkatopic-1
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>>>>> partition 0
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>>> following
>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
>>>>>>> consumers:
>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>>>>>>> partition 1
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>>>>> partition 0
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> As you can see from the log there are only 2 threads created and
>>>>>>>> shared
>>>>>>>>>>> among 3 topics. With this setting I think the parallelism is
>>>>>>> degraded
>>>>>>>>>> and a
>>>>>>>>>>> slow topic may impact other topics' consumption performance. Any
>>>>>>>>>> thoughts?
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
>>>>>>> wangg...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> createMessageStreams is used for consuming from specific
>>>>>>> topic(s),
>>>>>>>>>> where
>>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input
>>>>>>>>> parameters;
>>>>>>>>>>>> 
>>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
>>>>>>>>>> topics,
>>>>>>>>>>>> where you can put a (regex, num-threads) as its input parameters,
>>>>>>>> and
>>>>>>>>>> for
>>>>>>>>>>>> each regex matched topic num-threads will be created.
>>>>>>>>>>>> 
>>>>>>>>>>>> The difference between these two are not really for throughput /
>>>>>>>>>> latency,
>>>>>>>>>>>> but rather consumption semantics.
>>>>>>>>>>>> 
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi team,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I am comparing the differences between
>>>>>>>>>>>>> ConsumerConnector.createMessageStreams
>>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
>>>>>>>>> understanding
>>>>>>>>>> is
>>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is the
>>>>>>>>>> number
>>>>>>>>>>> of
>>>>>>>>>>>>> threads passed in to the method) dedicated to the specified
>>>>>>> topic
>>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of threads
>>>>>>>>> shared
>>>>>>>>>>> by
>>>>>>>>>>>>> topics specified by TopicFilter. Is it correct?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> If this is the case I assume createMessageStreams is the
>>>>>>>> preferred
>>>>>>>>>> way
>>>>>>>>>>> to
>>>>>>>>>>>>> create streams for each topic if I have high throughput and low
>>>>>>>>>> latency
>>>>>>>>>>>>> demands. is my assumption correct?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Tao
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> --
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> Regards,
>>>>>>>>>>> Tao
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Regards,
>>>>>>>>> Tao
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Regards,
>>>>>>> Tao
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> -- Guozhang
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> -- Guozhang
>>> 
>>> 
>> 
>> 
>> -- 
>> Regards,
>> Tao
> 
> ____________________________________________________________
> What's your flood risk?
> Find flood maps, interactive tools, FAQs, and agents in your area.
> http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc 
> <http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc>

Reply via email to