Fetch data from a leader to consumer. Replication fetcher is configured by another property
On Saturday, March 14, 2015, Zakee <kzak...@netzero.net> wrote: > Sorry, but still confused. Maximum number of threads (fetchers) to fetch > from a Leader or maximum number of threads within a follower broker? > > Thanks for clarifying, > -Zakee > > > > > On Mar 12, 2015, at 11:11 PM, tao xiao <xiaotao...@gmail.com > <javascript:;>> wrote: > > > > The number of fetchers is configurable via num.replica.fetchers. The > > description of num.replica.fetchers in Kafka documentation is not quite > > accurate. num.replica.fetchers actually controls the max number of > fetchers > > per broker. In you case num.replica.fetchers=8 and 5 brokers the means no > > more 8 fetchers created for each broker > > > > On Fri, Mar 13, 2015 at 1:21 PM, Zakee <kzak...@netzero.net > <javascript:;>> wrote: > > > >> Is this always the case that there is only one fetcher per broker, won’t > >> setting num.replica.fetchers greater than number-of-brokers cause more > >> fetchers per broker? > >> Let’s I have 5 brokers, and num of replica fetchers is 8, will there be > 2 > >> fetcher threads pulling from each broker? > >> > >> Thanks > >> Zakee > >> > >> > >> > >>> On Mar 12, 2015, at 11:15 AM, James Cheng <jch...@tivo.com > <javascript:;>> wrote: > >>> > >>> Ah, I understand now. I didn't realize that there was one fetcher > thread > >> per broker. > >>> > >>> Thanks Tao & Guozhang! > >>> -James > >>> > >>> > >>> On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao...@gmail.com > <javascript:;> <mailto: > >> xiaotao...@gmail.com <javascript:;>>> wrote: > >>> > >>>> Fetcher thread is per broker basis, it ensures that at lease one > fetcher > >>>> thread per broker. Fetcher thread is sent to broker with a fetch > >> request to > >>>> ask for all partitions. So if A, B, C are in the same broker fetcher > >> thread > >>>> is still able to fetch data from A, B, C even though A returns no > data. > >>>> same logic is applied to different broker. > >>>> > >>>> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jch...@tivo.com > <javascript:;>> wrote: > >>>> > >>>>> > >>>>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com > <javascript:;>> wrote: > >>>>> > >>>>>> Hi James, > >>>>>> > >>>>>> What I meant before is that a single fetcher may be responsible for > >>>>> putting > >>>>>> fetched data to multiple queues according to the construction of the > >>>>>> streams setup, where each queue may be consumed by a different > thread. > >>>>> And > >>>>>> the queues are actually bounded. Now say if there are two queues > that > >> are > >>>>>> getting data from the same fetcher F, and are consumed by two > >> different > >>>>>> user threads A and B. If thread A for some reason got slowed / hung > >>>>>> consuming data from queue 1, then queue 1 will eventually get full, > >> and F > >>>>>> trying to put more data to it will be blocked. Since F is parked on > >>>>> trying > >>>>>> to put data to queue 1, queue 2 will not get more data from it, and > >>>>> thread > >>>>>> B may hence gets starved. Does that make sense now? > >>>>>> > >>>>> > >>>>> Yes, that makes sense. That is the scenario where one thread of a > >> consumer > >>>>> can cause a backup in the queue, which would cause other threads to > not > >>>>> receive data. > >>>>> > >>>>> What about the situation I described, where a thread consumes a queue > >> that > >>>>> is supposed to be filled with messages from multiple partitions? If > >>>>> partition A has no messages and partitions B and C do, how will the > >> fetcher > >>>>> behave? Will the processing thread receive messages from partitions B > >> and C? > >>>>> > >>>>> Thanks, > >>>>> -James > >>>>> > >>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com > <javascript:;>> wrote: > >>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> Sorry to bring up this old thread, but my question is about this > >> exact > >>>>>>> thing: > >>>>>>> > >>>>>>> Guozhang, you said: > >>>>>>>> A more concrete example: say you have topic AC: 3 partitions, > topic > >>>>> BC: 6 > >>>>>>>> partitions. > >>>>>>>> > >>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 > threads > >>>>> will > >>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 > >>>>> respectively; > >>>>>>>> > >>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads > >> will > >>>>> be > >>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, > >> AC-3/BC-5/BC-6 > >>>>>>>> respectively. > >>>>>>> > >>>>>>> > >>>>>>> You said that in the createMessageStreamsByFilter case, if topic AC > >> had > >>>>> no > >>>>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads > >> might > >>>>> all > >>>>>>> be blocked waiting for data to arrive from topic AC, and so > messages > >>>>> from > >>>>>>> BC would not be processed. > >>>>>>> > >>>>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have > >> the > >>>>>>> same problem but just worse. Behind the scenes, is there a single > >> thread > >>>>>>> that is consuming (round-robin?) messages from the different > >> partitions > >>>>> and > >>>>>>> inserting them all into a single queue for the application code to > >>>>> process? > >>>>>>> And that is why a single partition with no messages with block the > >> other > >>>>>>> messages from getting through? > >>>>>>> > >>>>>>> What about createMessageStreams("AC" => 1)? That creates a single > >> stream > >>>>>>> that contains messages from multiple partitions, which might be on > >>>>>>> different brokers. Does that also suffer the same problem, where if > >> one > >>>>>>> partition has no messages, that the application would not receive > >>>>> messages > >>>>>>> from the other paritions? > >>>>>>> > >>>>>>> Thanks, > >>>>>>> -James > >>>>>>> > >>>>>>> > >>>>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com > <javascript:;>> > >> wrote: > >>>>>>> > >>>>>>>> The new consumer will be released in 0.9, which is targeted for > end > >> of > >>>>>>> this > >>>>>>>> quarter. > >>>>>>>> > >>>>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com > <javascript:;>> > >>>>> wrote: > >>>>>>>> > >>>>>>>>> Do you know when the new consumer API will be publicly available? > >>>>>>>>> > >>>>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang < > >> wangg...@gmail.com <javascript:;>> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by > two > >>>>>>>>>> different processes and AC processors gets stuck, hence AC > >> messages > >>>>>>> will > >>>>>>>>>> fill up in the consumer's buffer and eventually prevents the > >> fetcher > >>>>>>>>> thread > >>>>>>>>>> to put more data into it; the fetcher thread will be blocked on > >> that > >>>>>>> and > >>>>>>>>>> not be able to fetch BC. > >>>>>>>>>> > >>>>>>>>>> This issue has been addressed in the new consumer client, which > is > >>>>>>>>>> single-threaded with non-blocking APIs. > >>>>>>>>>> > >>>>>>>>>> Guozhang > >>>>>>>>>> > >>>>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com > <javascript:;>> > >>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Thank you Guozhang for your detailed explanation. In your > example > >>>>>>>>>>> createMessageStreamsByFilter("*C" => 3) since threads are > shared > >>>>>>> among > >>>>>>>>>>> topics there may be situation where all 3 threads threads get > >> stuck > >>>>>>>>> with > >>>>>>>>>>> topic AC e.g. topic is empty which will be holding the > connecting > >>>>>>>>> threads > >>>>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to > >> serve > >>>>>>>>> topic > >>>>>>>>>>> BC. do you think this situation will happen? > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang < > >> wangg...@gmail.com <javascript:;>> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter > each > >>>>>>>>> matched > >>>>>>>>>>>> topic will have num-threads, but shared: i.e. there will be > >> totally > >>>>>>>>>>>> num-threads created, but each thread will be responsible for > >>>>> fetching > >>>>>>>>>> all > >>>>>>>>>>>> matched topics. > >>>>>>>>>>>> > >>>>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions, > >> topic > >>>>>>>>>> BC: 6 > >>>>>>>>>>>> partitions. > >>>>>>>>>>>> > >>>>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 > >>>>> threads > >>>>>>>>>> will > >>>>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 > >>>>>>>>>> respectively; > >>>>>>>>>>>> > >>>>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 > >> threads > >>>>>>>>> will > >>>>>>>>>> be > >>>>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, > >>>>> AC-3/BC-5/BC-6 > >>>>>>>>>>>> respectively. > >>>>>>>>>>>> > >>>>>>>>>>>> Guozhang > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao < > xiaotao...@gmail.com <javascript:;> > >>> > >>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Guozhang, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Do you mean that each regex matched topic owns number of > >> threads > >>>>>>>>> that > >>>>>>>>>>> get > >>>>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in > >> below > >>>>>>>>> code > >>>>>>>>>>> If > >>>>>>>>>>>> I > >>>>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I > >> should > >>>>>>>>>> have > >>>>>>>>>>>> 3 * > >>>>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads. > >>>>>>>>>>>>> > >>>>>>>>>>>>> TopicFilter filter = new Whitelist(".*"); > >>>>>>>>>>>>> > >>>>>>>>>>>>> int threadTotal = 2; > >>>>>>>>>>>>> > >>>>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector > >>>>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal); > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> But what I observed from the log is different > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >> Consumer > >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the > >>>>>>>>>> following > >>>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with > >>>>>>>>> consumers: > >>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, > >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to > >> claim > >>>>>>>>>>>>> partition 1 > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to > >> claim > >>>>>>>>>>>>> partition 0 > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >> Consumer > >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the > >>>>>>>>>> following > >>>>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with > >> consumers: > >>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, > >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2015-02-11 00:24:04 WARN kafka.utils.Logging$class:83 - No > >> broker > >>>>>>>>>>>>> partitions consumed by consumer thread > >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic > >>>>>>>>> kafkatopic-1 > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to > >> claim > >>>>>>>>>>>>> partition 0 > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >> Consumer > >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the > >>>>>>>>>> following > >>>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with > >>>>>>>>> consumers: > >>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, > >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to > >> claim > >>>>>>>>>>>>> partition 1 > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to > >> claim > >>>>>>>>>>>>> partition 0 > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> As you can see from the log there are only 2 threads created > >> and > >>>>>>>>>> shared > >>>>>>>>>>>>> among 3 topics. With this setting I think the parallelism is > >>>>>>>>> degraded > >>>>>>>>>>>> and a > >>>>>>>>>>>>> slow topic may impact other topics' consumption performance. > >> Any > >>>>>>>>>>>> thoughts? > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang < > >>>>>>>>> wangg...@gmail.com <javascript:;>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> createMessageStreams is used for consuming from specific > >>>>>>>>> topic(s), > >>>>>>>>>>>> where > >>>>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input > >>>>>>>>>>> parameters; > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from > >> wildcard > >>>>>>>>>>>> topics, > >>>>>>>>>>>>>> where you can put a (regex, num-threads) as its input > >> parameters, > >>>>>>>>>> and > >>>>>>>>>>>> for > >>>>>>>>>>>>>> each regex matched topic num-threads will be created. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The difference between these two are not really for > >> throughput / > >>>>>>>>>>>> latency, > >>>>>>>>>>>>>> but rather consumption semantics. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao < > >> xiaotao...@gmail.com <javascript:;>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi team, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I am comparing the differences between > >>>>>>>>>>>>>>> ConsumerConnector.createMessageStreams > >>>>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My > >>>>>>>>>>> understanding > >>>>>>>>>>>> is > >>>>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is > >> the > >>>>>>>>>>>> number > >>>>>>>>>>>>> of > >>>>>>>>>>>>>>> threads passed in to the method) dedicated to the specified > >>>>>>>>> topic > >>>>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of > >> threads > >>>>>>>>>>> shared > >>>>>>>>>>>>> by > >>>>>>>>>>>>>>> topics specified by TopicFilter. Is it correct? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> If this is the case I assume createMessageStreams is the > >>>>>>>>>> preferred > >>>>>>>>>>>> way > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>> create streams for each topic if I have high throughput and > >> low > >>>>>>>>>>>> latency > >>>>>>>>>>>>>>> demands. is my assumption correct? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>> Tao > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -- > >>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> Regards, > >>>>>>>>>>>>> Tao > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> Regards, > >>>>>>>>>>> Tao > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> -- Guozhang > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Regards, > >>>>>>>>> Tao > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> -- Guozhang > >>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>> > >>>>> > >>>> > >>>> > >>>> -- > >>>> Regards, > >>>> Tao > >>> > >>> ____________________________________________________________ > >>> What's your flood risk? > >>> Find flood maps, interactive tools, FAQs, and agents in your area. > >>> > http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc > >> < > http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc> > >> > > > > > > > > -- > > Regards, > > Tao > > ____________________________________________________________ > > Want to place your ad here? > > Advertise on United Online > > http://thirdpartyoffers.netzero.net/TGL3255/55028688c3a996884bccmp02duc > > -- Regards, Tao