I was not clear before .. for createMessageStreamsByFilter each matched
topic will have num-threads, but shared: i.e. there will be totally
num-threads created, but each thread will be responsible for fetching all
matched topics.

A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
partitions.

With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;

With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
respectively.

Guozhang

On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com> wrote:

> Guozhang,
>
> Do you mean that each regex matched topic owns number of threads that get
> passed in to createMessageStreamsByFilter ? For example in below code If I
> have 3 matched topics each of which has 2 partitions then I should have 3 *
> 2 = 6 threads in total with each topic owning 2 threads.
>
> TopicFilter filter = new Whitelist(".*");
>
>  int threadTotal = 2;
>
>  List<KafkaStream<byte[], byte[]>> streams = connector
> .createMessageStreamsByFilter(filter, threadTotal);
>
>
> But what I observed from the log is different
>
> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with consumers:
> List(test1234dd5_localhost-1423585444070-82f23758-0,
> test1234dd5_localhost-1423585444070-82f23758-1)
>
> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> partition 1
>
> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> partition 0
>
> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
> List(test1234dd5_localhost-1423585444070-82f23758-0,
> test1234dd5_localhost-1423585444070-82f23758-1)
>
> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
> partitions consumed by consumer thread
> test1234dd5_localhost-1423585444070-82f23758-1 for topic kafkatopic-1
>
> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> partition 0
>
> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with consumers:
> List(test1234dd5_localhost-1423585444070-82f23758-0,
> test1234dd5_localhost-1423585444070-82f23758-1)
>
> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> partition 1
>
> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> partition 0
>
>
> As you can see from the log there are only 2 threads created and shared
> among 3 topics. With this setting I think the parallelism is degraded and a
> slow topic may impact other topics' consumption performance. Any thoughts?
>
> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > createMessageStreams is used for consuming from specific topic(s), where
> > you can put a map of [topic-name, num-threads] as its input parameters;
> >
> > createMessageStreamsByFilter is used for consuming from wildcard topics,
> > where you can put a (regex, num-threads) as its input parameters, and for
> > each regex matched topic num-threads will be created.
> >
> > The difference between these two are not really for throughput / latency,
> > but rather consumption semantics.
> >
> > Guozhang
> >
> >
> > On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com> wrote:
> >
> > > Hi team,
> > >
> > > I am comparing the differences between
> > > ConsumerConnector.createMessageStreams
> > > and ConsumerConnector.createMessageStreamsByFilter. My understanding is
> > > that createMessageStreams creates x number of threads (x is the number
> of
> > > threads passed in to the method) dedicated to the specified topic
> > > while createMessageStreamsByFilter creates x number of threads shared
> by
> > > topics specified by TopicFilter. Is it correct?
> > >
> > > If this is the case I assume createMessageStreams is the preferred way
> to
> > > create streams for each topic if I have high throughput and low latency
> > > demands. is my assumption correct?
> > >
> > > --
> > > Regards,
> > > Tao
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> Regards,
> Tao
>



-- 
-- Guozhang

Reply via email to