Ah, I understand now. I didn't realize that there was one fetcher thread per 
broker. 

Thanks Tao & Guozhang!
-James


On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao...@gmail.com> wrote:

> Fetcher thread is per broker basis, it ensures that at lease one fetcher
> thread per broker. Fetcher thread is sent to broker with a fetch request to
> ask for all partitions. So if A, B, C are in the same broker fetcher thread
> is still able to fetch data from A, B, C even though A returns no data.
> same logic is applied to different broker.
> 
> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jch...@tivo.com> wrote:
> 
>> 
>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>> 
>>> Hi James,
>>> 
>>> What I meant before is that a single fetcher may be responsible for
>> putting
>>> fetched data to multiple queues according to the construction of the
>>> streams setup, where each queue may be consumed by a different thread.
>> And
>>> the queues are actually bounded. Now say if there are two queues that are
>>> getting data from the same fetcher F, and are consumed by two different
>>> user threads A and B. If thread A for some reason got slowed / hung
>>> consuming data from queue 1, then queue 1 will eventually get full, and F
>>> trying to put more data to it will be blocked. Since F is parked on
>> trying
>>> to put data to queue 1, queue 2 will not get more data from it, and
>> thread
>>> B may hence gets starved. Does that make sense now?
>>> 
>> 
>> Yes, that makes sense. That is the scenario where one thread of a consumer
>> can cause a backup in the queue, which would cause other threads to not
>> receive data.
>> 
>> What about the situation I described, where a thread consumes a queue that
>> is supposed to be filled with messages from multiple partitions? If
>> partition A has no messages and partitions B and C do, how will the fetcher
>> behave? Will the processing thread receive messages from partitions B and C?
>> 
>> Thanks,
>> -James
>> 
>> 
>>> Guozhang
>>> 
>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Sorry to bring up this old thread, but my question is about this exact
>>>> thing:
>>>> 
>>>> Guozhang, you said:
>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>> BC: 6
>>>>> partitions.
>>>>> 
>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
>> will
>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>> respectively;
>>>>> 
>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will
>> be
>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
>>>>> respectively.
>>>> 
>>>> 
>>>> You said that in the createMessageStreamsByFilter case, if topic AC had
>> no
>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads might
>> all
>>>> be blocked waiting for data to arrive from topic AC, and so messages
>> from
>>>> BC would not be processed.
>>>> 
>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
>>>> same problem but just worse. Behind the scenes, is there a single thread
>>>> that is consuming (round-robin?) messages from the different partitions
>> and
>>>> inserting them all into a single queue for the application code to
>> process?
>>>> And that is why a single partition with no messages with block the other
>>>> messages from getting through?
>>>> 
>>>> What about createMessageStreams("AC" => 1)? That creates a single stream
>>>> that contains messages from multiple partitions, which might be on
>>>> different brokers. Does that also suffer the same problem, where if one
>>>> partition has no messages, that the application would not receive
>> messages
>>>> from the other paritions?
>>>> 
>>>> Thanks,
>>>> -James
>>>> 
>>>> 
>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>>>> 
>>>>> The new consumer will be released in 0.9, which is targeted for end of
>>>> this
>>>>> quarter.
>>>>> 
>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com>
>> wrote:
>>>>> 
>>>>>> Do you know when the new consumer API will be publicly available?
>>>>>> 
>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by two
>>>>>>> different processes and AC processors gets stuck, hence AC messages
>>>> will
>>>>>>> fill up in the consumer's buffer and eventually prevents the fetcher
>>>>>> thread
>>>>>>> to put more data into it; the fetcher thread will be blocked on that
>>>> and
>>>>>>> not be able to fetch BC.
>>>>>>> 
>>>>>>> This issue has been addressed in the new consumer client, which is
>>>>>>> single-threaded with non-blocking APIs.
>>>>>>> 
>>>>>>> Guozhang
>>>>>>> 
>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com>
>>>> wrote:
>>>>>>> 
>>>>>>>> Thank you Guozhang for your detailed explanation. In your example
>>>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
>>>> among
>>>>>>>> topics there may be situation where all 3 threads threads get stuck
>>>>>> with
>>>>>>>> topic AC e.g. topic is empty which will be holding the connecting
>>>>>> threads
>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
>>>>>> topic
>>>>>>>> BC. do you think this situation will happen?
>>>>>>>> 
>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter each
>>>>>> matched
>>>>>>>>> topic will have num-threads, but shared: i.e. there will be totally
>>>>>>>>> num-threads created, but each thread will be responsible for
>> fetching
>>>>>>> all
>>>>>>>>> matched topics.
>>>>>>>>> 
>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>>>>>>> BC: 6
>>>>>>>>> partitions.
>>>>>>>>> 
>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
>> threads
>>>>>>> will
>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>>>>>> respectively;
>>>>>>>>> 
>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
>>>>>> will
>>>>>>> be
>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
>> AC-3/BC-5/BC-6
>>>>>>>>> respectively.
>>>>>>>>> 
>>>>>>>>> Guozhang
>>>>>>>>> 
>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com>
>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Guozhang,
>>>>>>>>>> 
>>>>>>>>>> Do you mean that each regex matched topic owns number of threads
>>>>>> that
>>>>>>>> get
>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in below
>>>>>> code
>>>>>>>> If
>>>>>>>>> I
>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I should
>>>>>>> have
>>>>>>>>> 3 *
>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
>>>>>>>>>> 
>>>>>>>>>> TopicFilter filter = new Whitelist(".*");
>>>>>>>>>> 
>>>>>>>>>> int threadTotal = 2;
>>>>>>>>>> 
>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> But what I observed from the log is different
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>> following
>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
>>>>>> consumers:
>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>>>>>> partition 1
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>>>> partition 0
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>> following
>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
>>>>>>>>>> partitions consumed by consumer thread
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
>>>>>> kafkatopic-1
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>>>> partition 0
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>> following
>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
>>>>>> consumers:
>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>>>>>> partition 1
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>>>> partition 0
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> As you can see from the log there are only 2 threads created and
>>>>>>> shared
>>>>>>>>>> among 3 topics. With this setting I think the parallelism is
>>>>>> degraded
>>>>>>>>> and a
>>>>>>>>>> slow topic may impact other topics' consumption performance. Any
>>>>>>>>> thoughts?
>>>>>>>>>> 
>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
>>>>>> wangg...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> createMessageStreams is used for consuming from specific
>>>>>> topic(s),
>>>>>>>>> where
>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input
>>>>>>>> parameters;
>>>>>>>>>>> 
>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
>>>>>>>>> topics,
>>>>>>>>>>> where you can put a (regex, num-threads) as its input parameters,
>>>>>>> and
>>>>>>>>> for
>>>>>>>>>>> each regex matched topic num-threads will be created.
>>>>>>>>>>> 
>>>>>>>>>>> The difference between these two are not really for throughput /
>>>>>>>>> latency,
>>>>>>>>>>> but rather consumption semantics.
>>>>>>>>>>> 
>>>>>>>>>>> Guozhang
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi team,
>>>>>>>>>>>> 
>>>>>>>>>>>> I am comparing the differences between
>>>>>>>>>>>> ConsumerConnector.createMessageStreams
>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
>>>>>>>> understanding
>>>>>>>>> is
>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is the
>>>>>>>>> number
>>>>>>>>>> of
>>>>>>>>>>>> threads passed in to the method) dedicated to the specified
>>>>>> topic
>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of threads
>>>>>>>> shared
>>>>>>>>>> by
>>>>>>>>>>>> topics specified by TopicFilter. Is it correct?
>>>>>>>>>>>> 
>>>>>>>>>>>> If this is the case I assume createMessageStreams is the
>>>>>>> preferred
>>>>>>>>> way
>>>>>>>>>> to
>>>>>>>>>>>> create streams for each topic if I have high throughput and low
>>>>>>>>> latency
>>>>>>>>>>>> demands. is my assumption correct?
>>>>>>>>>>>> 
>>>>>>>>>>>> --
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Tao
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> Regards,
>>>>>>>>>> Tao
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Regards,
>>>>>>>> Tao
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Regards,
>>>>>> Tao
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> -- Guozhang
>>>> 
>>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>> 
>> 
> 
> 
> -- 
> Regards,
> Tao

Reply via email to