I was not clear before .. for createMessageStreamsByFilter each matched topic will have num-threads, but shared: i.e. there will be totally num-threads created, but each thread will be responsible for fetching all matched topics.
A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. Guozhang On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com> wrote: > Guozhang, > > Do you mean that each regex matched topic owns number of threads that get > passed in to createMessageStreamsByFilter ? For example in below code If I > have 3 matched topics each of which has 2 partitions then I should have 3 * > 2 = 6 threads in total with each topic owning 2 threads. > > TopicFilter filter = new Whitelist(".*"); > > int threadTotal = 2; > > List<KafkaStream<byte[], byte[]>> streams = connector > .createMessageStreamsByFilter(filter, threadTotal); > > > But what I observed from the log is different > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer > test1234dd5_localhost-1423585444070-82f23758 rebalancing the following > partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with consumers: > List(test1234dd5_localhost-1423585444070-82f23758-0, > test1234dd5_localhost-1423585444070-82f23758-1) > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim > partition 1 > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim > partition 0 > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer > test1234dd5_localhost-1423585444070-82f23758 rebalancing the following > partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers: > List(test1234dd5_localhost-1423585444070-82f23758-0, > test1234dd5_localhost-1423585444070-82f23758-1) > > 2015-02-11 00:24:04 WARN kafka.utils.Logging$class:83 - No broker > partitions consumed by consumer thread > test1234dd5_localhost-1423585444070-82f23758-1 for topic kafkatopic-1 > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim > partition 0 > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer > test1234dd5_localhost-1423585444070-82f23758 rebalancing the following > partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with consumers: > List(test1234dd5_localhost-1423585444070-82f23758-0, > test1234dd5_localhost-1423585444070-82f23758-1) > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim > partition 1 > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim > partition 0 > > > As you can see from the log there are only 2 threads created and shared > among 3 topics. With this setting I think the parallelism is degraded and a > slow topic may impact other topics' consumption performance. Any thoughts? > > On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > createMessageStreams is used for consuming from specific topic(s), where > > you can put a map of [topic-name, num-threads] as its input parameters; > > > > createMessageStreamsByFilter is used for consuming from wildcard topics, > > where you can put a (regex, num-threads) as its input parameters, and for > > each regex matched topic num-threads will be created. > > > > The difference between these two are not really for throughput / latency, > > but rather consumption semantics. > > > > Guozhang > > > > > > On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com> wrote: > > > > > Hi team, > > > > > > I am comparing the differences between > > > ConsumerConnector.createMessageStreams > > > and ConsumerConnector.createMessageStreamsByFilter. My understanding is > > > that createMessageStreams creates x number of threads (x is the number > of > > > threads passed in to the method) dedicated to the specified topic > > > while createMessageStreamsByFilter creates x number of threads shared > by > > > topics specified by TopicFilter. Is it correct? > > > > > > If this is the case I assume createMessageStreams is the preferred way > to > > > create streams for each topic if I have high throughput and low latency > > > demands. is my assumption correct? > > > > > > -- > > > Regards, > > > Tao > > > > > > > > > > > -- > > -- Guozhang > > > > > > -- > Regards, > Tao > -- -- Guozhang