Thanks for the details!

I do see a pattern where through() is useful both explicitly and implicitly
by the DSL. I guess that fits well with kafka streams design of utilizing
kafka's strength.

Srikanth

On Fri, May 20, 2016 at 4:38 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Hi Srikanth,
>
> I basically agree on (1). We are still working on configuration options
> for Kafka Streams.
>
> For (2), you would get an error. If the number of partitions is not the
> same, the join cannot be computed. There is already a ticket to insert a
> re-partitioning step automatically, in case data is not co-partitioned
> correctly for joins (https://issues.apache.org/jira/browse/KAFKA-3561)
>
> Right now, either both sources do have the same #partitions or you need
> to add a ".through()" to get the same number of partitions for both join
> input streams -- as described by Guozhang using a pre-defined topic with
> correct #partitions.
>
> >> Won't these replicated internal topics put too much load on brokers?
> >> We have to scale up brokers whenever our stream processing needs
> >> increase. Its easy to add/remove processing nodes to run kstream app
> >> instance but adding/removing brokers is not that straight forward.
> >
> > This is a good question, we are working on making recommendations about
> > how to easily re-process (possibly after scaling partitions) as we
> > mentioned in Kafka Streams, and Matthias (cc'ed) may be able to come
> > back to you.
>
> I don't think internal replication should become a problem. But we are
> still collecting feedback from users. If you have a Kafka cluster with
> many topics the broker capacity should be large enough to handle the
> load. Of course, if you start to deploy many apps, you might need to
> consider this for your overall #brokers in your cluster.
>
> About scaling: right now, the parallelism of you Kafka Streams app is
> limited by the number of topic partitions. Thus, the assumption is that
> an app would not easily become an bottleneck. If your load increases,
> you might need to increase #partitions of the underlaying topics anyway,
> thus, no additional overhead with regard to processing the data in your
> app.
>
>
> -Matthias
>
>
> On 05/20/2016 04:05 AM, Srikanth wrote:
> > Thanks Guozhang for your reply.
> >
> > I have a few follow-ups based on your response. Writing it inline would
> > have made it hard to read. So here is the extract
> >
> > 1) */Internal topics use default retention policy/*.
> > Will it be better to add another config for this? Or something like
> > topic.log.retention.hours=_kstreams_internal_:5??
> > I think retention needs for internal topics in general is going to be
> > very different from regular topics.
> >
> > 2) */Internal-topic's number of partitions is equal to the number of
> > partitions of source-topic./*
> > I also read in the doc
> > <http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html>
> > /"Since stream joins are performed over the keys of records, it is
> > required that joining streams are co-partitioned by key,/
> > /i.e., their corresponding *Kafka topics must have the same number of
> > partitions* and partitioned on the same key so that records with the
> > same keys are delivered to the same processing thread."/
> >
> > So if join a KStream from a topic with 4 partitions with a KTable from a
> > topic with 2 partitions, how is the join performed?
> > Source topics have different no.of partitions but the internal stream
> > used for join should have the same no.of partitions.
> >
> > Thanks,
> > Srikanth
> >
> > On Thu, May 19, 2016 at 2:29 PM, Guozhang Wang <wangg...@gmail.com
> > <mailto:wangg...@gmail.com>> wrote:
> >
> >     Hello Srikanth,
> >
> >     Thanks for your questions, please see replies inlined.
> >
> >
> >     On Tue, May 17, 2016 at 7:36 PM, Srikanth <srikanth...@gmail.com
> >     <mailto:srikanth...@gmail.com>> wrote:
> >
> >     > Hi,
> >     >
> >     > I was reading about Kafka streams and trying to understand its
> programming
> >     > model.
> >     > Some observations that I wanted to get some clarity on..
> >     >
> >     > 1) Joins & aggregations use an internal topic for shuffle. Source
> >     > processors will write to this topic with the key used for join.
> Then it is
> >     > free to commit offset for that run.
> >     > Does that mean we have to rely on internal topic's replication to
> guarantee
> >     > at-least-once processing?
> >     >
> >     > That is right, and users can set the config of
> "replication.factor" in
> >     StreamsConfig to configure the replication though.
> >
> >
> >
> >     >   Won't these replicated internal topics put too much load on
> brokers? We
> >     > have to scale up brokers whenever our stream processing needs
> increase.
> >     > Its easy to add/remove processing nodes to run kstream app
> instance but
> >     > adding/removing brokers is not that straight forward.
> >     >
> >     > This is a good question, we are working on making recommendations
> >     about
> >     how to easily re-process (possibly after scaling partitions) as we
> >     mentioned in Kafka Streams, and Matthias (cc'ed) may be able to come
> >     back
> >     to you.
> >
> >
> >     >
> >     > 2) How may partitions are created for the internal topic? What is
> the
> >     > retention policy?
> >     > When/how is it created and cleaned up when streaming application is
> >     > shutdown permanently?
> >     > Any link with deep dive into this will be helpful.
> >     >
> >     > The number of partitions is currently set to the number of tasks
> >     writing
> >     to those topics with default retention policy, which then in turn
> >     determined by the source topic, and it is created upon task
> >     initialization.
> >     For example, if you have a topology as:
> >
> >     source-topic -> sourceNode -> processNode -> sinkNode ->
> >     internal-topic ->
> >     sourceNode -> ...
> >
> >     Then the internal-topic's number of partitions is equal to the
> number of
> >     partitions of source-topic.
> >
> >     We are still working on cleaning up such topics if users do not want
> to
> >     re-process any more, there is a JIRA open for this:
> >     https://issues.apache.org/jira/browse/KAFKA-3185
> >
> >
> >     >
> >     >  3) I'm a bit confused on the threading model. Lets take the below
> example.
> >     > Assume "PageViews" and "UserProfile" have four partitions each.
> >     > I start two instance of this app both with two threads. So we have
> four
> >     > threads all together.
> >     > My understanding is that each thread will now receive records from
> one
> >     > partition in both topics. After reading they'll do the map,
> filter, etc and
> >     > write to internal topic for join.
> >     > Then the threads go on to read the next set of records from
> previous
> >     > offset. I guess this can be viewed as some sort of chaining with
> in a
> >     > stage.
> >     >
> >     >   Now where does the thread that reads from internal topic run? Do
> they
> >     > share the same set of threads?
> >     >
> >
> >     Sub-topologies that are "cut" by the internal topics are considered
> not
> >     connected, and are assigned to different tasks.
> >
> >     So again back to this example:
> >
> >     [ source-topic -> sourceNode -> processNode -> sinkNode -> ]  [
> >     internal-topic -> sourceNode -> ... ]
> >
> >     It is translated as two sub-topologies, with four partitions of the
> >     source-topic, it will get 8 tasks in total, 4 of them for the first
> >     sub-topology and the other 4 of them for the second sub-topology; but
> >     multiple tasks can be hosted on the same thread, so if you have 4
> >     threads
> >     in total each one will host 2 tasks, and the assignment is done by
> the
> >     library to achieve shorter restoration process.
> >
> >     Can we increase the parallelism for just this processor? If I know
> >     that the
> >     > join().map().process() chain is more compute intensive.
> >     >
> >     >   KStream<String, GenericRecord> views =
> builder.stream("PageViews")
> >     >       .map(...)
> >     > .filter(...)
> >     >
> >     >   KTable<String, GenericRecord> users =
> builder.table("UserProfile")
> >     > .mapValues(...)
> >     >
> >     >   KStream<String, Long> regionCount = views
> >     >                .leftJoin(user, ...)
> >     > .map(...)
> >     > .process(...)
> >     >
> >     >
> >     If you want to just increase parallelism of part of the topology,
> >     you can
> >     use a "through()" call with an intermediate topic created by
> >     yourself which
> >     effectively cut the topology into dis-connected sub-topologies, and
> by
> >     controlling the number of partitions of this intermediate topic you
> can
> >     control parallelism of different sub-topologies.
> >
> >
> >     > I hope I was able to explain my questions clear enough for you to
> >     > understand.
> >     >
> >     > Thanks,
> >     > Srikanth
> >     >
> >
> >
> >
> >     --
> >     -- Guozhang
> >
> >
>
>

Reply via email to