Hi Srikanth, I basically agree on (1). We are still working on configuration options for Kafka Streams.
For (2), you would get an error. If the number of partitions is not the same, the join cannot be computed. There is already a ticket to insert a re-partitioning step automatically, in case data is not co-partitioned correctly for joins (https://issues.apache.org/jira/browse/KAFKA-3561) Right now, either both sources do have the same #partitions or you need to add a ".through()" to get the same number of partitions for both join input streams -- as described by Guozhang using a pre-defined topic with correct #partitions. >> Won't these replicated internal topics put too much load on brokers? >> We have to scale up brokers whenever our stream processing needs >> increase. Its easy to add/remove processing nodes to run kstream app >> instance but adding/removing brokers is not that straight forward. > > This is a good question, we are working on making recommendations about > how to easily re-process (possibly after scaling partitions) as we > mentioned in Kafka Streams, and Matthias (cc'ed) may be able to come > back to you. I don't think internal replication should become a problem. But we are still collecting feedback from users. If you have a Kafka cluster with many topics the broker capacity should be large enough to handle the load. Of course, if you start to deploy many apps, you might need to consider this for your overall #brokers in your cluster. About scaling: right now, the parallelism of you Kafka Streams app is limited by the number of topic partitions. Thus, the assumption is that an app would not easily become an bottleneck. If your load increases, you might need to increase #partitions of the underlaying topics anyway, thus, no additional overhead with regard to processing the data in your app. -Matthias On 05/20/2016 04:05 AM, Srikanth wrote: > Thanks Guozhang for your reply. > > I have a few follow-ups based on your response. Writing it inline would > have made it hard to read. So here is the extract > > 1) */Internal topics use default retention policy/*. > Will it be better to add another config for this? Or something like > topic.log.retention.hours=_kstreams_internal_:5?? > I think retention needs for internal topics in general is going to be > very different from regular topics. > > 2) */Internal-topic's number of partitions is equal to the number of > partitions of source-topic./* > I also read in the doc > <http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html> > /"Since stream joins are performed over the keys of records, it is > required that joining streams are co-partitioned by key,/ > /i.e., their corresponding *Kafka topics must have the same number of > partitions* and partitioned on the same key so that records with the > same keys are delivered to the same processing thread."/ > > So if join a KStream from a topic with 4 partitions with a KTable from a > topic with 2 partitions, how is the join performed? > Source topics have different no.of partitions but the internal stream > used for join should have the same no.of partitions. > > Thanks, > Srikanth > > On Thu, May 19, 2016 at 2:29 PM, Guozhang Wang <wangg...@gmail.com > <mailto:wangg...@gmail.com>> wrote: > > Hello Srikanth, > > Thanks for your questions, please see replies inlined. > > > On Tue, May 17, 2016 at 7:36 PM, Srikanth <srikanth...@gmail.com > <mailto:srikanth...@gmail.com>> wrote: > > > Hi, > > > > I was reading about Kafka streams and trying to understand its > programming > > model. > > Some observations that I wanted to get some clarity on.. > > > > 1) Joins & aggregations use an internal topic for shuffle. Source > > processors will write to this topic with the key used for join. Then it > is > > free to commit offset for that run. > > Does that mean we have to rely on internal topic's replication to > guarantee > > at-least-once processing? > > > > That is right, and users can set the config of "replication.factor" in > StreamsConfig to configure the replication though. > > > > > Won't these replicated internal topics put too much load on brokers? > We > > have to scale up brokers whenever our stream processing needs increase. > > Its easy to add/remove processing nodes to run kstream app instance but > > adding/removing brokers is not that straight forward. > > > > This is a good question, we are working on making recommendations > about > how to easily re-process (possibly after scaling partitions) as we > mentioned in Kafka Streams, and Matthias (cc'ed) may be able to come > back > to you. > > > > > > 2) How may partitions are created for the internal topic? What is the > > retention policy? > > When/how is it created and cleaned up when streaming application is > > shutdown permanently? > > Any link with deep dive into this will be helpful. > > > > The number of partitions is currently set to the number of tasks > writing > to those topics with default retention policy, which then in turn > determined by the source topic, and it is created upon task > initialization. > For example, if you have a topology as: > > source-topic -> sourceNode -> processNode -> sinkNode -> > internal-topic -> > sourceNode -> ... > > Then the internal-topic's number of partitions is equal to the number of > partitions of source-topic. > > We are still working on cleaning up such topics if users do not want to > re-process any more, there is a JIRA open for this: > https://issues.apache.org/jira/browse/KAFKA-3185 > > > > > > 3) I'm a bit confused on the threading model. Lets take the below > example. > > Assume "PageViews" and "UserProfile" have four partitions each. > > I start two instance of this app both with two threads. So we have four > > threads all together. > > My understanding is that each thread will now receive records from one > > partition in both topics. After reading they'll do the map, filter, etc > and > > write to internal topic for join. > > Then the threads go on to read the next set of records from previous > > offset. I guess this can be viewed as some sort of chaining with in a > > stage. > > > > Now where does the thread that reads from internal topic run? Do they > > share the same set of threads? > > > > Sub-topologies that are "cut" by the internal topics are considered not > connected, and are assigned to different tasks. > > So again back to this example: > > [ source-topic -> sourceNode -> processNode -> sinkNode -> ] [ > internal-topic -> sourceNode -> ... ] > > It is translated as two sub-topologies, with four partitions of the > source-topic, it will get 8 tasks in total, 4 of them for the first > sub-topology and the other 4 of them for the second sub-topology; but > multiple tasks can be hosted on the same thread, so if you have 4 > threads > in total each one will host 2 tasks, and the assignment is done by the > library to achieve shorter restoration process. > > Can we increase the parallelism for just this processor? If I know > that the > > join().map().process() chain is more compute intensive. > > > > KStream<String, GenericRecord> views = builder.stream("PageViews") > > .map(...) > > .filter(...) > > > > KTable<String, GenericRecord> users = builder.table("UserProfile") > > .mapValues(...) > > > > KStream<String, Long> regionCount = views > > .leftJoin(user, ...) > > .map(...) > > .process(...) > > > > > If you want to just increase parallelism of part of the topology, > you can > use a "through()" call with an intermediate topic created by > yourself which > effectively cut the topology into dis-connected sub-topologies, and by > controlling the number of partitions of this intermediate topic you can > control parallelism of different sub-topologies. > > > > I hope I was able to explain my questions clear enough for you to > > understand. > > > > Thanks, > > Srikanth > > > > > > -- > -- Guozhang > >
signature.asc
Description: OpenPGP digital signature