Thank you for your reply. I have attached my first attempt at writing a KIP and I was wondering if you could review it and share your thoughts.
Going forward I would like to create this KIP. I was wondering whom I should ask to get the necessary permissions on the wiki. Username: winkelman.kyle On Fri, Apr 21, 2017 at 3:15 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Kyle, > > Sorry for the delay in replying. I think it's worth doing a KIP for this > one. One super helpful thing with KIPs is to list a few more scenarios that > would benefit from this approach. In particular it seems the main benefit > is from reducing the number of state stores. Does this necessarily reduce > the number of IOs to the stores (number of puts/gets), or the extra space > overheads with multiple stores. Quantifying that a bit would help. > > To answer your original questions: > > >The problem I am having with this approach is understanding if there is a > race condition. Obviously the source topics would be copartitioned. But > would it be multithreaded and possibly cause one of the processors to grab > patient 1 at the same time a different processor has grabbed patient 1? > > > I don't think there will be a problem here. A processor cannot be accessed > by multiple threads in Kafka Streams. > > > >My understanding is that for each partition there would be a single > complete set of processors and a new incoming record would go completely > through the processor topology from a source node to a sink node before the > next one is sent through. Is this correct? > > This is mostly true, however if caching is enabled (for dedupping, see > KIP-63), then a record may reside in a cache before going to the sink. > Meanwhile another record can come in. So multiple records can be in the > topology at the same time. > > Thanks > Eno > > > > > > On Fri, Apr 14, 2017 at 8:16 PM, Kyle Winkelman <winkelman.k...@gmail.com> > wrote: > >> Eno, >> Thanks for the response. The figure was just a restatement of my >> questions. I have made an attempt at a low level processor and it appears >> to work but it isn't very pretty and was hoping for something at the >> streams api level. >> >> I have written some code to show an example of how I see the Cogroup >> working in kafka. >> >> First the KGroupedStream would have a cogroup method that takes the >> initializer and the aggregator for that specific KGroupedStream. This would >> return a KCogroupedStream that has 2 methods one to add more >> KGroupedStream, Aggregator pairs and one to complete the construction and >> return a KTable. >> >> builder.stream("topic").groupByKey ().cogroup(Initializer, Aggregator, >> aggValueSerde, storeName).cogroup(groupedStream1, >> Aggregator1).cogroup(groupedStream2, Aggregator2).aggregate(); >> >> Behind the scenes we create a KStreamAggregate for each KGroupedStream, >> Aggregator pair. Then a final pass through processor to pass on the >> aggregate values. This gives us a KTable backed by a single store that is >> used in all of the processors. >> >> Please let me know if this is something you think would add value to >> kafka streams. And I will try to create a KIP to foster more communication. >> >> You can take a look at what I have. I think it's missing a fair amount >> but it's a good start. I took the doAggregate method in KGroupedStream as >> my starting point and expanded on it for multiple streams: >> https://github.com/KyleWinkelman/kafka/tree/cogroup >> >> >
KIP-? - Kafka-Streams Cogroup Status Current state: Under Discussion Discussion thread: here JIRA: here Released: <Kafka Version> Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). Motivation When multiple streams aggregate together to form a single large object (eg. A shopping website may have a cart stream, a wish list stream, and a purchases stream. Together they make up a Customer.), it is very difficult to accomodate this in the Kafka-Streams DSL. It generally requires you to group and aggregate all of the streams to KTables then make multiple outerjoin calls to end up with a KTable with your desired object. This will create a state store for each stream and a long chain of ValueJoiners that each new record must go through to get to the final object. Creating a cogroup method where you use a single state store will: 1. Reduce the number of gets from state stores. With the multiple joins when a new value comes into any of the streams a chain reaction happens where ValueGetters keep calling ValueGetters until we have accessed all state stores. 2. Slight performance increase. As described above all ValueGetters are called also causing all ValueJoiners to be called forcing a recalculation of the current joined value of all other streams, impacting performance. Public Interfaces KGroupedStream { //Possibly add support for Windows and Sessions as well. ... <T> KCogroupedStream<K, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Serde<T> aggValueSerde, final String storeName); <T> KCogroupedStream<K, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final StateStoreSupplier<?> storeSupplier); } public interface KCogroupedStream<K, V> { <T> KCogroupedStream<K, V> cogroup(KGroupedStream<K, T> groupedStream, Aggregator<? super K, ? super T, V> aggregator); KTable<K, V> aggregate(); } Expected use: KTable<K, T> cogroupedTable = groupedStream1.cogroup(initializer, aggregator1, aggValueSerde, "aggValue") .cogroup(groupedStream2, aggregator2) .cogroup(groupedStream3, aggregator3) ... .cogroup(groupedStreamN, aggregatorN) .aggregate(); Proposed Changes 1. Construct the above Public Interfaces. 2. Create an internal.KCogroupedStreamImpl that will keep track of the StateStoreSupplier, Initializer, and Pairs of (KGroupedStream, Aggregator). 3. Model the aggregate method of internal.KCogroupedStream after the doAggregate method of KGroupedStream by forcing the KGroupedStreams to repartitionIfRequired and adding the KStreamAggregate Processor for each KGroupedStream. Additionally add a KStreamCogroup processor and ensure all sources are copartitioned and processors have access to the state store. 4. Create a KStreamCogroup that will passthrough all outputs from the KStreamAggregate. KStreamCogroup must also be a KStreamAggProcessorSupplier; it will keep track of all of its parents KStreamAggregates in case it needs to enableSendingOldValues and it can have one of them create a KTableValueGetterSupplier if view is called. Compatibility, Deprecation, and Migration Plan Users must upgrade to new version if they want to use this functionality. Test Plan Integration Test similar to KStreamAggregationIntegrationTest Rejected Alternatives