Hi Kyle, Sorry for the delay in replying. I think it's worth doing a KIP for this one. One super helpful thing with KIPs is to list a few more scenarios that would benefit from this approach. In particular it seems the main benefit is from reducing the number of state stores. Does this necessarily reduce the number of IOs to the stores (number of puts/gets), or the extra space overheads with multiple stores. Quantifying that a bit would help.
To answer your original questions: >The problem I am having with this approach is understanding if there is a race condition. Obviously the source topics would be copartitioned. But would it be multithreaded and possibly cause one of the processors to grab patient 1 at the same time a different processor has grabbed patient 1? I don't think there will be a problem here. A processor cannot be accessed by multiple threads in Kafka Streams. >My understanding is that for each partition there would be a single complete set of processors and a new incoming record would go completely through the processor topology from a source node to a sink node before the next one is sent through. Is this correct? This is mostly true, however if caching is enabled (for dedupping, see KIP-63), then a record may reside in a cache before going to the sink. Meanwhile another record can come in. So multiple records can be in the topology at the same time. Thanks Eno On Fri, Apr 14, 2017 at 8:16 PM, Kyle Winkelman <winkelman.k...@gmail.com> wrote: > Eno, > Thanks for the response. The figure was just a restatement of my > questions. I have made an attempt at a low level processor and it appears > to work but it isn't very pretty and was hoping for something at the > streams api level. > > I have written some code to show an example of how I see the Cogroup > working in kafka. > > First the KGroupedStream would have a cogroup method that takes the > initializer and the aggregator for that specific KGroupedStream. This would > return a KCogroupedStream that has 2 methods one to add more > KGroupedStream, Aggregator pairs and one to complete the construction and > return a KTable. > > builder.stream("topic").groupByKey ().cogroup(Initializer, Aggregator, > aggValueSerde, storeName).cogroup(groupedStream1, > Aggregator1).cogroup(groupedStream2, > Aggregator2).aggregate(); > > Behind the scenes we create a KStreamAggregate for each KGroupedStream, > Aggregator pair. Then a final pass through processor to pass on the > aggregate values. This gives us a KTable backed by a single store that is > used in all of the processors. > > Please let me know if this is something you think would add value to kafka > streams. And I will try to create a KIP to foster more communication. > > You can take a look at what I have. I think it's missing a fair amount but > it's a good start. I took the doAggregate method in KGroupedStream as my > starting point and expanded on it for multiple streams: > https://github.com/KyleWinkelman/kafka/tree/cogroup > >