Is this always the case that there is only one fetcher per broker, won’t setting num.replica.fetchers greater than number-of-brokers cause more fetchers per broker? Let’s I have 5 brokers, and num of replica fetchers is 8, will there be 2 fetcher threads pulling from each broker?
Thanks Zakee > On Mar 12, 2015, at 11:15 AM, James Cheng <jch...@tivo.com> wrote: > > Ah, I understand now. I didn't realize that there was one fetcher thread per > broker. > > Thanks Tao & Guozhang! > -James > > > On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao...@gmail.com > <mailto:xiaotao...@gmail.com>> wrote: > >> Fetcher thread is per broker basis, it ensures that at lease one fetcher >> thread per broker. Fetcher thread is sent to broker with a fetch request to >> ask for all partitions. So if A, B, C are in the same broker fetcher thread >> is still able to fetch data from A, B, C even though A returns no data. >> same logic is applied to different broker. >> >> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jch...@tivo.com> wrote: >> >>> >>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com> wrote: >>> >>>> Hi James, >>>> >>>> What I meant before is that a single fetcher may be responsible for >>> putting >>>> fetched data to multiple queues according to the construction of the >>>> streams setup, where each queue may be consumed by a different thread. >>> And >>>> the queues are actually bounded. Now say if there are two queues that are >>>> getting data from the same fetcher F, and are consumed by two different >>>> user threads A and B. If thread A for some reason got slowed / hung >>>> consuming data from queue 1, then queue 1 will eventually get full, and F >>>> trying to put more data to it will be blocked. Since F is parked on >>> trying >>>> to put data to queue 1, queue 2 will not get more data from it, and >>> thread >>>> B may hence gets starved. Does that make sense now? >>>> >>> >>> Yes, that makes sense. That is the scenario where one thread of a consumer >>> can cause a backup in the queue, which would cause other threads to not >>> receive data. >>> >>> What about the situation I described, where a thread consumes a queue that >>> is supposed to be filled with messages from multiple partitions? If >>> partition A has no messages and partitions B and C do, how will the fetcher >>> behave? Will the processing thread receive messages from partitions B and C? >>> >>> Thanks, >>> -James >>> >>> >>>> Guozhang >>>> >>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> Sorry to bring up this old thread, but my question is about this exact >>>>> thing: >>>>> >>>>> Guozhang, you said: >>>>>> A more concrete example: say you have topic AC: 3 partitions, topic >>> BC: 6 >>>>>> partitions. >>>>>> >>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads >>> will >>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 >>> respectively; >>>>>> >>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will >>> be >>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 >>>>>> respectively. >>>>> >>>>> >>>>> You said that in the createMessageStreamsByFilter case, if topic AC had >>> no >>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads might >>> all >>>>> be blocked waiting for data to arrive from topic AC, and so messages >>> from >>>>> BC would not be processed. >>>>> >>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have the >>>>> same problem but just worse. Behind the scenes, is there a single thread >>>>> that is consuming (round-robin?) messages from the different partitions >>> and >>>>> inserting them all into a single queue for the application code to >>> process? >>>>> And that is why a single partition with no messages with block the other >>>>> messages from getting through? >>>>> >>>>> What about createMessageStreams("AC" => 1)? That creates a single stream >>>>> that contains messages from multiple partitions, which might be on >>>>> different brokers. Does that also suffer the same problem, where if one >>>>> partition has no messages, that the application would not receive >>> messages >>>>> from the other paritions? >>>>> >>>>> Thanks, >>>>> -James >>>>> >>>>> >>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com> wrote: >>>>> >>>>>> The new consumer will be released in 0.9, which is targeted for end of >>>>> this >>>>>> quarter. >>>>>> >>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com> >>> wrote: >>>>>> >>>>>>> Do you know when the new consumer API will be publicly available? >>>>>>> >>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by two >>>>>>>> different processes and AC processors gets stuck, hence AC messages >>>>> will >>>>>>>> fill up in the consumer's buffer and eventually prevents the fetcher >>>>>>> thread >>>>>>>> to put more data into it; the fetcher thread will be blocked on that >>>>> and >>>>>>>> not be able to fetch BC. >>>>>>>> >>>>>>>> This issue has been addressed in the new consumer client, which is >>>>>>>> single-threaded with non-blocking APIs. >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com> >>>>> wrote: >>>>>>>> >>>>>>>>> Thank you Guozhang for your detailed explanation. In your example >>>>>>>>> createMessageStreamsByFilter("*C" => 3) since threads are shared >>>>> among >>>>>>>>> topics there may be situation where all 3 threads threads get stuck >>>>>>> with >>>>>>>>> topic AC e.g. topic is empty which will be holding the connecting >>>>>>> threads >>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve >>>>>>> topic >>>>>>>>> BC. do you think this situation will happen? >>>>>>>>> >>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter each >>>>>>> matched >>>>>>>>>> topic will have num-threads, but shared: i.e. there will be totally >>>>>>>>>> num-threads created, but each thread will be responsible for >>> fetching >>>>>>>> all >>>>>>>>>> matched topics. >>>>>>>>>> >>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic >>>>>>>> BC: 6 >>>>>>>>>> partitions. >>>>>>>>>> >>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 >>> threads >>>>>>>> will >>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 >>>>>>>> respectively; >>>>>>>>>> >>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads >>>>>>> will >>>>>>>> be >>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, >>> AC-3/BC-5/BC-6 >>>>>>>>>> respectively. >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Guozhang, >>>>>>>>>>> >>>>>>>>>>> Do you mean that each regex matched topic owns number of threads >>>>>>> that >>>>>>>>> get >>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in below >>>>>>> code >>>>>>>>> If >>>>>>>>>> I >>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I should >>>>>>>> have >>>>>>>>>> 3 * >>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads. >>>>>>>>>>> >>>>>>>>>>> TopicFilter filter = new Whitelist(".*"); >>>>>>>>>>> >>>>>>>>>>> int threadTotal = 2; >>>>>>>>>>> >>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector >>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal); >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> But what I observed from the log is different >>>>>>>>>>> >>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the >>>>>>>> following >>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with >>>>>>> consumers: >>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) >>>>>>>>>>> >>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim >>>>>>>>>>> partition 1 >>>>>>>>>>> >>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim >>>>>>>>>>> partition 0 >>>>>>>>>>> >>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the >>>>>>>> following >>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers: >>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) >>>>>>>>>>> >>>>>>>>>>> 2015-02-11 00:24:04 WARN kafka.utils.Logging$class:83 - No broker >>>>>>>>>>> partitions consumed by consumer thread >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic >>>>>>> kafkatopic-1 >>>>>>>>>>> >>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim >>>>>>>>>>> partition 0 >>>>>>>>>>> >>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the >>>>>>>> following >>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with >>>>>>> consumers: >>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) >>>>>>>>>>> >>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim >>>>>>>>>>> partition 1 >>>>>>>>>>> >>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim >>>>>>>>>>> partition 0 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> As you can see from the log there are only 2 threads created and >>>>>>>> shared >>>>>>>>>>> among 3 topics. With this setting I think the parallelism is >>>>>>> degraded >>>>>>>>>> and a >>>>>>>>>>> slow topic may impact other topics' consumption performance. Any >>>>>>>>>> thoughts? >>>>>>>>>>> >>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang < >>>>>>> wangg...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> createMessageStreams is used for consuming from specific >>>>>>> topic(s), >>>>>>>>>> where >>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input >>>>>>>>> parameters; >>>>>>>>>>>> >>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard >>>>>>>>>> topics, >>>>>>>>>>>> where you can put a (regex, num-threads) as its input parameters, >>>>>>>> and >>>>>>>>>> for >>>>>>>>>>>> each regex matched topic num-threads will be created. >>>>>>>>>>>> >>>>>>>>>>>> The difference between these two are not really for throughput / >>>>>>>>>> latency, >>>>>>>>>>>> but rather consumption semantics. >>>>>>>>>>>> >>>>>>>>>>>> Guozhang >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi team, >>>>>>>>>>>>> >>>>>>>>>>>>> I am comparing the differences between >>>>>>>>>>>>> ConsumerConnector.createMessageStreams >>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My >>>>>>>>> understanding >>>>>>>>>> is >>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is the >>>>>>>>>> number >>>>>>>>>>> of >>>>>>>>>>>>> threads passed in to the method) dedicated to the specified >>>>>>> topic >>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of threads >>>>>>>>> shared >>>>>>>>>>> by >>>>>>>>>>>>> topics specified by TopicFilter. Is it correct? >>>>>>>>>>>>> >>>>>>>>>>>>> If this is the case I assume createMessageStreams is the >>>>>>>> preferred >>>>>>>>>> way >>>>>>>>>>> to >>>>>>>>>>>>> create streams for each topic if I have high throughput and low >>>>>>>>>> latency >>>>>>>>>>>>> demands. is my assumption correct? >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> Tao >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> -- Guozhang >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Regards, >>>>>>>>>>> Tao >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> -- Guozhang >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Regards, >>>>>>>>> Tao >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Regards, >>>>>>> Tao >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>> >>>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>> >>> >> >> >> -- >> Regards, >> Tao > > ____________________________________________________________ > What's your flood risk? > Find flood maps, interactive tools, FAQs, and agents in your area. > http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc > <http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc>