Ah, I understand now. I didn't realize that there was one fetcher thread per broker.
Thanks Tao & Guozhang! -James On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao...@gmail.com> wrote: > Fetcher thread is per broker basis, it ensures that at lease one fetcher > thread per broker. Fetcher thread is sent to broker with a fetch request to > ask for all partitions. So if A, B, C are in the same broker fetcher thread > is still able to fetch data from A, B, C even though A returns no data. > same logic is applied to different broker. > > On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jch...@tivo.com> wrote: > >> >> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com> wrote: >> >>> Hi James, >>> >>> What I meant before is that a single fetcher may be responsible for >> putting >>> fetched data to multiple queues according to the construction of the >>> streams setup, where each queue may be consumed by a different thread. >> And >>> the queues are actually bounded. Now say if there are two queues that are >>> getting data from the same fetcher F, and are consumed by two different >>> user threads A and B. If thread A for some reason got slowed / hung >>> consuming data from queue 1, then queue 1 will eventually get full, and F >>> trying to put more data to it will be blocked. Since F is parked on >> trying >>> to put data to queue 1, queue 2 will not get more data from it, and >> thread >>> B may hence gets starved. Does that make sense now? >>> >> >> Yes, that makes sense. That is the scenario where one thread of a consumer >> can cause a backup in the queue, which would cause other threads to not >> receive data. >> >> What about the situation I described, where a thread consumes a queue that >> is supposed to be filled with messages from multiple partitions? If >> partition A has no messages and partitions B and C do, how will the fetcher >> behave? Will the processing thread receive messages from partitions B and C? >> >> Thanks, >> -James >> >> >>> Guozhang >>> >>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com> wrote: >>> >>>> Hi, >>>> >>>> Sorry to bring up this old thread, but my question is about this exact >>>> thing: >>>> >>>> Guozhang, you said: >>>>> A more concrete example: say you have topic AC: 3 partitions, topic >> BC: 6 >>>>> partitions. >>>>> >>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads >> will >>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 >> respectively; >>>>> >>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will >> be >>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 >>>>> respectively. >>>> >>>> >>>> You said that in the createMessageStreamsByFilter case, if topic AC had >> no >>>> messages in it and consumer.timeout.ms = -1, then the 3 threads might >> all >>>> be blocked waiting for data to arrive from topic AC, and so messages >> from >>>> BC would not be processed. >>>> >>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have the >>>> same problem but just worse. Behind the scenes, is there a single thread >>>> that is consuming (round-robin?) messages from the different partitions >> and >>>> inserting them all into a single queue for the application code to >> process? >>>> And that is why a single partition with no messages with block the other >>>> messages from getting through? >>>> >>>> What about createMessageStreams("AC" => 1)? That creates a single stream >>>> that contains messages from multiple partitions, which might be on >>>> different brokers. Does that also suffer the same problem, where if one >>>> partition has no messages, that the application would not receive >> messages >>>> from the other paritions? >>>> >>>> Thanks, >>>> -James >>>> >>>> >>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com> wrote: >>>> >>>>> The new consumer will be released in 0.9, which is targeted for end of >>>> this >>>>> quarter. >>>>> >>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com> >> wrote: >>>>> >>>>>> Do you know when the new consumer API will be publicly available? >>>>>> >>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Yes, it can get stuck. For example, AC and BC are processed by two >>>>>>> different processes and AC processors gets stuck, hence AC messages >>>> will >>>>>>> fill up in the consumer's buffer and eventually prevents the fetcher >>>>>> thread >>>>>>> to put more data into it; the fetcher thread will be blocked on that >>>> and >>>>>>> not be able to fetch BC. >>>>>>> >>>>>>> This issue has been addressed in the new consumer client, which is >>>>>>> single-threaded with non-blocking APIs. >>>>>>> >>>>>>> Guozhang >>>>>>> >>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com> >>>> wrote: >>>>>>> >>>>>>>> Thank you Guozhang for your detailed explanation. In your example >>>>>>>> createMessageStreamsByFilter("*C" => 3) since threads are shared >>>> among >>>>>>>> topics there may be situation where all 3 threads threads get stuck >>>>>> with >>>>>>>> topic AC e.g. topic is empty which will be holding the connecting >>>>>> threads >>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve >>>>>> topic >>>>>>>> BC. do you think this situation will happen? >>>>>>>> >>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>>> I was not clear before .. for createMessageStreamsByFilter each >>>>>> matched >>>>>>>>> topic will have num-threads, but shared: i.e. there will be totally >>>>>>>>> num-threads created, but each thread will be responsible for >> fetching >>>>>>> all >>>>>>>>> matched topics. >>>>>>>>> >>>>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic >>>>>>> BC: 6 >>>>>>>>> partitions. >>>>>>>>> >>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 >> threads >>>>>>> will >>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 >>>>>>> respectively; >>>>>>>>> >>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads >>>>>> will >>>>>>> be >>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, >> AC-3/BC-5/BC-6 >>>>>>>>> respectively. >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com> >>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Guozhang, >>>>>>>>>> >>>>>>>>>> Do you mean that each regex matched topic owns number of threads >>>>>> that >>>>>>>> get >>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in below >>>>>> code >>>>>>>> If >>>>>>>>> I >>>>>>>>>> have 3 matched topics each of which has 2 partitions then I should >>>>>>> have >>>>>>>>> 3 * >>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads. >>>>>>>>>> >>>>>>>>>> TopicFilter filter = new Whitelist(".*"); >>>>>>>>>> >>>>>>>>>> int threadTotal = 2; >>>>>>>>>> >>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector >>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> But what I observed from the log is different >>>>>>>>>> >>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer >>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the >>>>>>> following >>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with >>>>>> consumers: >>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, >>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) >>>>>>>>>> >>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim >>>>>>>>>> partition 1 >>>>>>>>>> >>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim >>>>>>>>>> partition 0 >>>>>>>>>> >>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer >>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the >>>>>>> following >>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers: >>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, >>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) >>>>>>>>>> >>>>>>>>>> 2015-02-11 00:24:04 WARN kafka.utils.Logging$class:83 - No broker >>>>>>>>>> partitions consumed by consumer thread >>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic >>>>>> kafkatopic-1 >>>>>>>>>> >>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim >>>>>>>>>> partition 0 >>>>>>>>>> >>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer >>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the >>>>>>> following >>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with >>>>>> consumers: >>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, >>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) >>>>>>>>>> >>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim >>>>>>>>>> partition 1 >>>>>>>>>> >>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim >>>>>>>>>> partition 0 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> As you can see from the log there are only 2 threads created and >>>>>>> shared >>>>>>>>>> among 3 topics. With this setting I think the parallelism is >>>>>> degraded >>>>>>>>> and a >>>>>>>>>> slow topic may impact other topics' consumption performance. Any >>>>>>>>> thoughts? >>>>>>>>>> >>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang < >>>>>> wangg...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> createMessageStreams is used for consuming from specific >>>>>> topic(s), >>>>>>>>> where >>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input >>>>>>>> parameters; >>>>>>>>>>> >>>>>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard >>>>>>>>> topics, >>>>>>>>>>> where you can put a (regex, num-threads) as its input parameters, >>>>>>> and >>>>>>>>> for >>>>>>>>>>> each regex matched topic num-threads will be created. >>>>>>>>>>> >>>>>>>>>>> The difference between these two are not really for throughput / >>>>>>>>> latency, >>>>>>>>>>> but rather consumption semantics. >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi team, >>>>>>>>>>>> >>>>>>>>>>>> I am comparing the differences between >>>>>>>>>>>> ConsumerConnector.createMessageStreams >>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My >>>>>>>> understanding >>>>>>>>> is >>>>>>>>>>>> that createMessageStreams creates x number of threads (x is the >>>>>>>>> number >>>>>>>>>> of >>>>>>>>>>>> threads passed in to the method) dedicated to the specified >>>>>> topic >>>>>>>>>>>> while createMessageStreamsByFilter creates x number of threads >>>>>>>> shared >>>>>>>>>> by >>>>>>>>>>>> topics specified by TopicFilter. Is it correct? >>>>>>>>>>>> >>>>>>>>>>>> If this is the case I assume createMessageStreams is the >>>>>>> preferred >>>>>>>>> way >>>>>>>>>> to >>>>>>>>>>>> create streams for each topic if I have high throughput and low >>>>>>>>> latency >>>>>>>>>>>> demands. is my assumption correct? >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Regards, >>>>>>>>>>>> Tao >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> -- Guozhang >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Regards, >>>>>>>>>> Tao >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- Guozhang >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Regards, >>>>>>>> Tao >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> -- Guozhang >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Regards, >>>>>> Tao >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>> >>>> >>> >>> >>> -- >>> -- Guozhang >> >> > > > -- > Regards, > Tao