Thanks for the details! I do see a pattern where through() is useful both explicitly and implicitly by the DSL. I guess that fits well with kafka streams design of utilizing kafka's strength.
Srikanth On Fri, May 20, 2016 at 4:38 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Hi Srikanth, > > I basically agree on (1). We are still working on configuration options > for Kafka Streams. > > For (2), you would get an error. If the number of partitions is not the > same, the join cannot be computed. There is already a ticket to insert a > re-partitioning step automatically, in case data is not co-partitioned > correctly for joins (https://issues.apache.org/jira/browse/KAFKA-3561) > > Right now, either both sources do have the same #partitions or you need > to add a ".through()" to get the same number of partitions for both join > input streams -- as described by Guozhang using a pre-defined topic with > correct #partitions. > > >> Won't these replicated internal topics put too much load on brokers? > >> We have to scale up brokers whenever our stream processing needs > >> increase. Its easy to add/remove processing nodes to run kstream app > >> instance but adding/removing brokers is not that straight forward. > > > > This is a good question, we are working on making recommendations about > > how to easily re-process (possibly after scaling partitions) as we > > mentioned in Kafka Streams, and Matthias (cc'ed) may be able to come > > back to you. > > I don't think internal replication should become a problem. But we are > still collecting feedback from users. If you have a Kafka cluster with > many topics the broker capacity should be large enough to handle the > load. Of course, if you start to deploy many apps, you might need to > consider this for your overall #brokers in your cluster. > > About scaling: right now, the parallelism of you Kafka Streams app is > limited by the number of topic partitions. Thus, the assumption is that > an app would not easily become an bottleneck. If your load increases, > you might need to increase #partitions of the underlaying topics anyway, > thus, no additional overhead with regard to processing the data in your > app. > > > -Matthias > > > On 05/20/2016 04:05 AM, Srikanth wrote: > > Thanks Guozhang for your reply. > > > > I have a few follow-ups based on your response. Writing it inline would > > have made it hard to read. So here is the extract > > > > 1) */Internal topics use default retention policy/*. > > Will it be better to add another config for this? Or something like > > topic.log.retention.hours=_kstreams_internal_:5?? > > I think retention needs for internal topics in general is going to be > > very different from regular topics. > > > > 2) */Internal-topic's number of partitions is equal to the number of > > partitions of source-topic./* > > I also read in the doc > > <http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html> > > /"Since stream joins are performed over the keys of records, it is > > required that joining streams are co-partitioned by key,/ > > /i.e., their corresponding *Kafka topics must have the same number of > > partitions* and partitioned on the same key so that records with the > > same keys are delivered to the same processing thread."/ > > > > So if join a KStream from a topic with 4 partitions with a KTable from a > > topic with 2 partitions, how is the join performed? > > Source topics have different no.of partitions but the internal stream > > used for join should have the same no.of partitions. > > > > Thanks, > > Srikanth > > > > On Thu, May 19, 2016 at 2:29 PM, Guozhang Wang <wangg...@gmail.com > > <mailto:wangg...@gmail.com>> wrote: > > > > Hello Srikanth, > > > > Thanks for your questions, please see replies inlined. > > > > > > On Tue, May 17, 2016 at 7:36 PM, Srikanth <srikanth...@gmail.com > > <mailto:srikanth...@gmail.com>> wrote: > > > > > Hi, > > > > > > I was reading about Kafka streams and trying to understand its > programming > > > model. > > > Some observations that I wanted to get some clarity on.. > > > > > > 1) Joins & aggregations use an internal topic for shuffle. Source > > > processors will write to this topic with the key used for join. > Then it is > > > free to commit offset for that run. > > > Does that mean we have to rely on internal topic's replication to > guarantee > > > at-least-once processing? > > > > > > That is right, and users can set the config of > "replication.factor" in > > StreamsConfig to configure the replication though. > > > > > > > > > Won't these replicated internal topics put too much load on > brokers? We > > > have to scale up brokers whenever our stream processing needs > increase. > > > Its easy to add/remove processing nodes to run kstream app > instance but > > > adding/removing brokers is not that straight forward. > > > > > > This is a good question, we are working on making recommendations > > about > > how to easily re-process (possibly after scaling partitions) as we > > mentioned in Kafka Streams, and Matthias (cc'ed) may be able to come > > back > > to you. > > > > > > > > > > 2) How may partitions are created for the internal topic? What is > the > > > retention policy? > > > When/how is it created and cleaned up when streaming application is > > > shutdown permanently? > > > Any link with deep dive into this will be helpful. > > > > > > The number of partitions is currently set to the number of tasks > > writing > > to those topics with default retention policy, which then in turn > > determined by the source topic, and it is created upon task > > initialization. > > For example, if you have a topology as: > > > > source-topic -> sourceNode -> processNode -> sinkNode -> > > internal-topic -> > > sourceNode -> ... > > > > Then the internal-topic's number of partitions is equal to the > number of > > partitions of source-topic. > > > > We are still working on cleaning up such topics if users do not want > to > > re-process any more, there is a JIRA open for this: > > https://issues.apache.org/jira/browse/KAFKA-3185 > > > > > > > > > > 3) I'm a bit confused on the threading model. Lets take the below > example. > > > Assume "PageViews" and "UserProfile" have four partitions each. > > > I start two instance of this app both with two threads. So we have > four > > > threads all together. > > > My understanding is that each thread will now receive records from > one > > > partition in both topics. After reading they'll do the map, > filter, etc and > > > write to internal topic for join. > > > Then the threads go on to read the next set of records from > previous > > > offset. I guess this can be viewed as some sort of chaining with > in a > > > stage. > > > > > > Now where does the thread that reads from internal topic run? Do > they > > > share the same set of threads? > > > > > > > Sub-topologies that are "cut" by the internal topics are considered > not > > connected, and are assigned to different tasks. > > > > So again back to this example: > > > > [ source-topic -> sourceNode -> processNode -> sinkNode -> ] [ > > internal-topic -> sourceNode -> ... ] > > > > It is translated as two sub-topologies, with four partitions of the > > source-topic, it will get 8 tasks in total, 4 of them for the first > > sub-topology and the other 4 of them for the second sub-topology; but > > multiple tasks can be hosted on the same thread, so if you have 4 > > threads > > in total each one will host 2 tasks, and the assignment is done by > the > > library to achieve shorter restoration process. > > > > Can we increase the parallelism for just this processor? If I know > > that the > > > join().map().process() chain is more compute intensive. > > > > > > KStream<String, GenericRecord> views = > builder.stream("PageViews") > > > .map(...) > > > .filter(...) > > > > > > KTable<String, GenericRecord> users = > builder.table("UserProfile") > > > .mapValues(...) > > > > > > KStream<String, Long> regionCount = views > > > .leftJoin(user, ...) > > > .map(...) > > > .process(...) > > > > > > > > If you want to just increase parallelism of part of the topology, > > you can > > use a "through()" call with an intermediate topic created by > > yourself which > > effectively cut the topology into dis-connected sub-topologies, and > by > > controlling the number of partitions of this intermediate topic you > can > > control parallelism of different sub-topologies. > > > > > > > I hope I was able to explain my questions clear enough for you to > > > understand. > > > > > > Thanks, > > > Srikanth > > > > > > > > > > > -- > > -- Guozhang > > > > > >