Thanks Guozhang for your reply.

I have a few follow-ups based on your response. Writing it inline would
have made it hard to read. So here is the extract

1) *Internal topics use default retention policy*.
Will it be better to add another config for this? Or something like
topic.log.retention.hours=_kstreams_internal_:5??
I think retention needs for internal topics in general is going to be very
different from regular topics.

2) *Internal-topic's number of partitions is equal to the number of
partitions of source-topic.*
I also read in the doc
<http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html>
*"Since stream joins are performed over the keys of records, it is required
that joining streams are co-partitioned by key,*
*i.e., their corresponding Kafka topics must have the same number of
partitions and partitioned on the same key so that records with the same
keys are delivered to the same processing thread."*

So if join a KStream from a topic with 4 partitions with a KTable from a
topic with 2 partitions, how is the join performed?
Source topics have different no.of partitions but the internal stream used
for join should have the same no.of partitions.

Thanks,
Srikanth

On Thu, May 19, 2016 at 2:29 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Srikanth,
>
> Thanks for your questions, please see replies inlined.
>
>
> On Tue, May 17, 2016 at 7:36 PM, Srikanth <srikanth...@gmail.com> wrote:
>
> > Hi,
> >
> > I was reading about Kafka streams and trying to understand its
> programming
> > model.
> > Some observations that I wanted to get some clarity on..
> >
> > 1) Joins & aggregations use an internal topic for shuffle. Source
> > processors will write to this topic with the key used for join. Then it
> is
> > free to commit offset for that run.
> > Does that mean we have to rely on internal topic's replication to
> guarantee
> > at-least-once processing?
> >
> > That is right, and users can set the config of "replication.factor" in
> StreamsConfig to configure the replication though.
>
>
>
> >   Won't these replicated internal topics put too much load on brokers? We
> > have to scale up brokers whenever our stream processing needs increase.
> > Its easy to add/remove processing nodes to run kstream app instance but
> > adding/removing brokers is not that straight forward.
> >
> > This is a good question, we are working on making recommendations about
> how to easily re-process (possibly after scaling partitions) as we
> mentioned in Kafka Streams, and Matthias (cc'ed) may be able to come back
> to you.
>
>
> >
> > 2) How may partitions are created for the internal topic? What is the
> > retention policy?
> > When/how is it created and cleaned up when streaming application is
> > shutdown permanently?
> > Any link with deep dive into this will be helpful.
> >
> > The number of partitions is currently set to the number of tasks writing
> to those topics with default retention policy, which then in turn
> determined by the source topic, and it is created upon task initialization.
> For example, if you have a topology as:
>
> source-topic -> sourceNode -> processNode -> sinkNode -> internal-topic ->
> sourceNode -> ...
>
> Then the internal-topic's number of partitions is equal to the number of
> partitions of source-topic.
>
> We are still working on cleaning up such topics if users do not want to
> re-process any more, there is a JIRA open for this:
> https://issues.apache.org/jira/browse/KAFKA-3185
>
>
> >
> >  3) I'm a bit confused on the threading model. Lets take the below
> example.
> > Assume "PageViews" and "UserProfile" have four partitions each.
> > I start two instance of this app both with two threads. So we have four
> > threads all together.
> > My understanding is that each thread will now receive records from one
> > partition in both topics. After reading they'll do the map, filter, etc
> and
> > write to internal topic for join.
> > Then the threads go on to read the next set of records from previous
> > offset. I guess this can be viewed as some sort of chaining with in a
> > stage.
> >
> >   Now where does the thread that reads from internal topic run? Do they
> > share the same set of threads?
> >
>
> Sub-topologies that are "cut" by the internal topics are considered not
> connected, and are assigned to different tasks.
>
> So again back to this example:
>
> [ source-topic -> sourceNode -> processNode -> sinkNode -> ]  [
> internal-topic -> sourceNode -> ... ]
>
> It is translated as two sub-topologies, with four partitions of the
> source-topic, it will get 8 tasks in total, 4 of them for the first
> sub-topology and the other 4 of them for the second sub-topology; but
> multiple tasks can be hosted on the same thread, so if you have 4 threads
> in total each one will host 2 tasks, and the assignment is done by the
> library to achieve shorter restoration process.
>
> Can we increase the parallelism for just this processor? If I know that the
> > join().map().process() chain is more compute intensive.
> >
> >   KStream<String, GenericRecord> views = builder.stream("PageViews")
> >       .map(...)
> > .filter(...)
> >
> >   KTable<String, GenericRecord> users = builder.table("UserProfile")
> > .mapValues(...)
> >
> >   KStream<String, Long> regionCount = views
> >                .leftJoin(user, ...)
> > .map(...)
> > .process(...)
> >
> >
> If you want to just increase parallelism of part of the topology, you can
> use a "through()" call with an intermediate topic created by yourself which
> effectively cut the topology into dis-connected sub-topologies, and by
> controlling the number of partitions of this intermediate topic you can
> control parallelism of different sub-topologies.
>
>
> > I hope I was able to explain my questions clear enough for you to
> > understand.
> >
> > Thanks,
> > Srikanth
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to