The re-discover new consumer member within the group is part of the
Consumer Rebalance protocol that Streams simply relies on. More details can
be found here:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal

A one sentence summary is that the new consumer will notify the coordinator
about its existence which will then notify other consumers within the group
to rebalance their tasks so that some can be migrated to the new comer.

Guozhang

On Fri, Feb 3, 2017 at 2:51 AM, Sachin Mittal <sjmit...@gmail.com> wrote:

> > Any reason why you don't just let streams create the changelog topic? Yes
> you should partition it the same as the source topic.
>
> Only reason is that I need to use my max.message.bytes and in version
> 0.10.0.1 configuring the same to state store supplier is not supported.
> But I understood that number of partitions should be same as source one. I
> will take care of that.
>
> > When an instance fails or is shutdown it will be removed from the
> consumer
> group. When it is removed a rebalance will be triggered. The partitions
> will be re-assigned to the remaining threads.
>
> I understood this part. However I did not understand that we we restart the
> same (failed) instance again, at that time all the existing threads are
> already rebalanced and processing different partitions. Now when this new
> instance is up, how and when will some of the existing threads give up
> (some of) their existing partitions and shift them to the threads of this
> new instance.
>
> I looked at new consumer configs, is this metadata.max.age.ms somehow part
> of this rediscover new consumer functionality..
>
> Thanks
> Sachin
>
>
> On Fri, Feb 3, 2017 at 3:26 PM, Damian Guy <damian....@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > On 3 February 2017 at 09:07, Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > >
> > > 1. Now what I wanted to know is that for separate machines running same
> > > instance of the streams application, my application.id would be same
> > > right.
> > >
> >
> > That is correct.
> >
> >
> > > If yes then how does kafka cluser know which partition to assign to
> which
> > > machine and which thread.
> > > Because what I see that on same machine each thread has its unique
> name,
> > so
> > > it will get message from a given partition(s) only, but how does kafka
> > > cluster know that each machines thread are different from some other
> > > machines.
> > > Like how does it distinguish thread-1 from machine A vs machine B. Do I
> > > need to configure something here.
> > >
> > >
> > This is all taken care of by the Kafka Consumer. All application
> instances
> > and threads with the same application.id are part of the same consumer
> > group. So the kafka consumer will ensure the partitions are balanced
> across
> > the available threads.
> >
> >
> > > 2. Also my stream processing creates an internal changelog topic which
> is
> > > backed by rocksDB state store.
> > > - So should I have to partition that topic too in same number of
> > partitions
> > > as my source topic. (Here I am creating that change log topic manually)
> > >
> > >
> > Any reason why you don't just let streams create the changelog topic? Yes
> > you should partition it the same as the source topic.
> >
> >
> >
> > > 3. If I don't create that change log topic manually and let kafka
> stream
> > > create that automatically, then does what number of partitions it uses.
> > >
> > >
> > The same number as the source topic.
> >
> >
> > > 4. Suppose my change log topic has single partition (if that is
> allowed)
> > > and now we will have multiple threads accessing that. Is there any
> > deadlock
> > > situation I need to worry about.
> > >
> >
> > Multiple threads will never access a single partition in a kafka streams
> > app. A partition is only ever consumed by a single thread.
> >
> >
> > >
> > > 5. Also now multiple threads will access the same state store and
> attempt
> > > to recreate from change log topic if there is a need. How does this
> work.
> > >
> > >
> > It is done on a per partition basis. You have a state store for each
> > partition.
> >
> >
> > > 6. Lastly say one instance fails then other instances will try to
> balance
> > > off the load, now when i bring that instance up, how does partition get
> > re
> > > assigned to it? Like at what point does some old thread stops
> processing
> > > that partition and new thread of new instance takes over. Is there any
> > > configuration needed gere?
> > >
> >
> > When an instance fails or is shutdown it will be removed from the
> consumer
> > group. When it is removed a rebalance will be triggered. The partitions
> > will be re-assigned to the remaining threads.
> >
> > There are some settings you can adjust that will effect the length of
> time
> > it takes for a dead consumer to be detected.
> >
> > i.e.,
> > max.poll.interval.ms
> > heartbeat.interval.ms
> > session.timeout.ms
> >
> > I suggest you take a look at the consumer config docs here:
> > https://kafka.apache.org/documentation/#newconsumerconfigs
> >
> > Thanks,
> > Damian
> >
>



-- 
-- Guozhang

Reply via email to