The new consumer will be released in 0.9, which is targeted for end of this quarter.
On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com> wrote: > Do you know when the new consumer API will be publicly available? > > On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > Yes, it can get stuck. For example, AC and BC are processed by two > > different processes and AC processors gets stuck, hence AC messages will > > fill up in the consumer's buffer and eventually prevents the fetcher > thread > > to put more data into it; the fetcher thread will be blocked on that and > > not be able to fetch BC. > > > > This issue has been addressed in the new consumer client, which is > > single-threaded with non-blocking APIs. > > > > Guozhang > > > > On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com> wrote: > > > > > Thank you Guozhang for your detailed explanation. In your example > > > createMessageStreamsByFilter("*C" => 3) since threads are shared among > > > topics there may be situation where all 3 threads threads get stuck > with > > > topic AC e.g. topic is empty which will be holding the connecting > threads > > > (setting consumer.timeout.ms=-1) hence there is no thread to serve > topic > > > BC. do you think this situation will happen? > > > > > > On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > I was not clear before .. for createMessageStreamsByFilter each > matched > > > > topic will have num-threads, but shared: i.e. there will be totally > > > > num-threads created, but each thread will be responsible for fetching > > all > > > > matched topics. > > > > > > > > A more concrete example: say you have topic AC: 3 partitions, topic > > BC: 6 > > > > partitions. > > > > > > > > With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads > > will > > > > be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 > > respectively; > > > > > > > > With createMessageStreamsByFilter("*C" => 3) a total of 3 threads > will > > be > > > > created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 > > > > respectively. > > > > > > > > Guozhang > > > > > > > > On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com> > > wrote: > > > > > > > > > Guozhang, > > > > > > > > > > Do you mean that each regex matched topic owns number of threads > that > > > get > > > > > passed in to createMessageStreamsByFilter ? For example in below > code > > > If > > > > I > > > > > have 3 matched topics each of which has 2 partitions then I should > > have > > > > 3 * > > > > > 2 = 6 threads in total with each topic owning 2 threads. > > > > > > > > > > TopicFilter filter = new Whitelist(".*"); > > > > > > > > > > int threadTotal = 2; > > > > > > > > > > List<KafkaStream<byte[], byte[]>> streams = connector > > > > > .createMessageStreamsByFilter(filter, threadTotal); > > > > > > > > > > > > > > > But what I observed from the log is different > > > > > > > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer > > > > > test1234dd5_localhost-1423585444070-82f23758 rebalancing the > > following > > > > > partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with > consumers: > > > > > List(test1234dd5_localhost-1423585444070-82f23758-0, > > > > > test1234dd5_localhost-1423585444070-82f23758-1) > > > > > > > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > > > > > test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim > > > > > partition 1 > > > > > > > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > > > > > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim > > > > > partition 0 > > > > > > > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer > > > > > test1234dd5_localhost-1423585444070-82f23758 rebalancing the > > following > > > > > partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers: > > > > > List(test1234dd5_localhost-1423585444070-82f23758-0, > > > > > test1234dd5_localhost-1423585444070-82f23758-1) > > > > > > > > > > 2015-02-11 00:24:04 WARN kafka.utils.Logging$class:83 - No broker > > > > > partitions consumed by consumer thread > > > > > test1234dd5_localhost-1423585444070-82f23758-1 for topic > kafkatopic-1 > > > > > > > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > > > > > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim > > > > > partition 0 > > > > > > > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer > > > > > test1234dd5_localhost-1423585444070-82f23758 rebalancing the > > following > > > > > partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with > consumers: > > > > > List(test1234dd5_localhost-1423585444070-82f23758-0, > > > > > test1234dd5_localhost-1423585444070-82f23758-1) > > > > > > > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > > > > > test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim > > > > > partition 1 > > > > > > > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > > > > > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim > > > > > partition 0 > > > > > > > > > > > > > > > As you can see from the log there are only 2 threads created and > > shared > > > > > among 3 topics. With this setting I think the parallelism is > degraded > > > > and a > > > > > slow topic may impact other topics' consumption performance. Any > > > > thoughts? > > > > > > > > > > On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > createMessageStreams is used for consuming from specific > topic(s), > > > > where > > > > > > you can put a map of [topic-name, num-threads] as its input > > > parameters; > > > > > > > > > > > > createMessageStreamsByFilter is used for consuming from wildcard > > > > topics, > > > > > > where you can put a (regex, num-threads) as its input parameters, > > and > > > > for > > > > > > each regex matched topic num-threads will be created. > > > > > > > > > > > > The difference between these two are not really for throughput / > > > > latency, > > > > > > but rather consumption semantics. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com> > > > > wrote: > > > > > > > > > > > > > Hi team, > > > > > > > > > > > > > > I am comparing the differences between > > > > > > > ConsumerConnector.createMessageStreams > > > > > > > and ConsumerConnector.createMessageStreamsByFilter. My > > > understanding > > > > is > > > > > > > that createMessageStreams creates x number of threads (x is the > > > > number > > > > > of > > > > > > > threads passed in to the method) dedicated to the specified > topic > > > > > > > while createMessageStreamsByFilter creates x number of threads > > > shared > > > > > by > > > > > > > topics specified by TopicFilter. Is it correct? > > > > > > > > > > > > > > If this is the case I assume createMessageStreams is the > > preferred > > > > way > > > > > to > > > > > > > create streams for each topic if I have high throughput and low > > > > latency > > > > > > > demands. is my assumption correct? > > > > > > > > > > > > > > -- > > > > > > > Regards, > > > > > > > Tao > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Regards, > > > > > Tao > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > > -- > > > Regards, > > > Tao > > > > > > > > > > > -- > > -- Guozhang > > > > > > -- > Regards, > Tao > -- -- Guozhang