Yes, it can get stuck. For example, AC and BC are processed by two
different processes and AC processors gets stuck, hence AC messages will
fill up in the consumer's buffer and eventually prevents the fetcher thread
to put more data into it; the fetcher thread will be blocked on that and
not be able to fetch BC.

This issue has been addressed in the new consumer client, which is
single-threaded with non-blocking APIs.

Guozhang

On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com> wrote:

> Thank you Guozhang for your detailed explanation. In your example
> createMessageStreamsByFilter("*C" => 3)  since threads are shared among
> topics there may be situation where all 3 threads threads get stuck with
> topic AC e.g. topic is empty which will be holding the connecting threads
> (setting consumer.timeout.ms=-1) hence there is no thread to serve topic
> BC. do you think this situation will happen?
>
> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > I was not clear before .. for createMessageStreamsByFilter each matched
> > topic will have num-threads, but shared: i.e. there will be totally
> > num-threads created, but each thread will be responsible for fetching all
> > matched topics.
> >
> > A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
> > partitions.
> >
> > With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
> > be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
> >
> > With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
> > created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> > respectively.
> >
> > Guozhang
> >
> > On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com> wrote:
> >
> > > Guozhang,
> > >
> > > Do you mean that each regex matched topic owns number of threads that
> get
> > > passed in to createMessageStreamsByFilter ? For example in below code
> If
> > I
> > > have 3 matched topics each of which has 2 partitions then I should have
> > 3 *
> > > 2 = 6 threads in total with each topic owning 2 threads.
> > >
> > > TopicFilter filter = new Whitelist(".*");
> > >
> > >  int threadTotal = 2;
> > >
> > >  List<KafkaStream<byte[], byte[]>> streams = connector
> > > .createMessageStreamsByFilter(filter, threadTotal);
> > >
> > >
> > > But what I observed from the log is different
> > >
> > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> > > test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
> > > partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with consumers:
> > > List(test1234dd5_localhost-1423585444070-82f23758-0,
> > > test1234dd5_localhost-1423585444070-82f23758-1)
> > >
> > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > > test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> > > partition 1
> > >
> > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> > > partition 0
> > >
> > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> > > test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
> > > partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
> > > List(test1234dd5_localhost-1423585444070-82f23758-0,
> > > test1234dd5_localhost-1423585444070-82f23758-1)
> > >
> > > 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
> > > partitions consumed by consumer thread
> > > test1234dd5_localhost-1423585444070-82f23758-1 for topic kafkatopic-1
> > >
> > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> > > partition 0
> > >
> > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> > > test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
> > > partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with consumers:
> > > List(test1234dd5_localhost-1423585444070-82f23758-0,
> > > test1234dd5_localhost-1423585444070-82f23758-1)
> > >
> > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > > test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> > > partition 1
> > >
> > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> > > partition 0
> > >
> > >
> > > As you can see from the log there are only 2 threads created and shared
> > > among 3 topics. With this setting I think the parallelism is degraded
> > and a
> > > slow topic may impact other topics' consumption performance. Any
> > thoughts?
> > >
> > > On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > >
> > > > createMessageStreams is used for consuming from specific topic(s),
> > where
> > > > you can put a map of [topic-name, num-threads] as its input
> parameters;
> > > >
> > > > createMessageStreamsByFilter is used for consuming from wildcard
> > topics,
> > > > where you can put a (regex, num-threads) as its input parameters, and
> > for
> > > > each regex matched topic num-threads will be created.
> > > >
> > > > The difference between these two are not really for throughput /
> > latency,
> > > > but rather consumption semantics.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com>
> > wrote:
> > > >
> > > > > Hi team,
> > > > >
> > > > > I am comparing the differences between
> > > > > ConsumerConnector.createMessageStreams
> > > > > and ConsumerConnector.createMessageStreamsByFilter. My
> understanding
> > is
> > > > > that createMessageStreams creates x number of threads (x is the
> > number
> > > of
> > > > > threads passed in to the method) dedicated to the specified topic
> > > > > while createMessageStreamsByFilter creates x number of threads
> shared
> > > by
> > > > > topics specified by TopicFilter. Is it correct?
> > > > >
> > > > > If this is the case I assume createMessageStreams is the preferred
> > way
> > > to
> > > > > create streams for each topic if I have high throughput and low
> > latency
> > > > > demands. is my assumption correct?
> > > > >
> > > > > --
> > > > > Regards,
> > > > > Tao
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > Regards,
> > > Tao
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> Regards,
> Tao
>



-- 
-- Guozhang

Reply via email to