Hello, I am wondering if there is any way to aggregate together many streams at once to build a larger object. Example (Healthcare Domain): I have streams of Medical, Pharmacy, and Lab claims. Key is PatientId, Value is a different Avro Record for each stream. I was hoping there was a way to supply a single Initializer, () -> new Patient(), and 3 aggregators, (key, value, patient) -> patient.add******Claim(value).
Currently the only way that I see to do the above use case is by aggregating each individual stream then joining them. This doesn't scale well with a large number of input streams because for each stream I would be creating another state store. I was hoping to get thoughts on a KCogroupedStream api. I have spent a little time conceptualizing it. Approach 1: In KGroupedStream add a cogroup method that takes the single initializer, a list of other kgroupedstreams, and a list of other aggregators. This would then all flow through a single processor and a have a single backing state store. The aggregator that the object will get sent to is determined by the context().topic() which we should be able to trace back to one of the kgroupedstreams in the list. The problem I am having with this approach is that because everything is going through the single processors and java doesn't do the best with generic types. I have to either pass in a list of Type objects for casting the object before sending it to the aggregator or I must create aggregators that accept an object and cast them to the appropriate type. Approach 2: Create one processor for each aggregator and have a single state store. Then have a single KStreamPassThrough that just passes on the new aggregate value. The positive for this is you know which stream it will be coming from and won't need to do the context().topic() trick. The problem I am having with this approach is understanding if there is a race condition. Obviously the source topics would be copartitioned. But would it be multithreaded and possibly cause one of the processors to grab patient 1 at the same time a different processor has grabbed patient 1? My understanding is that for each partition there would be a single complete set of processors and a new incoming record would go completely through the processor topology from a source node to a sink node before the next one is sent through. Is this correct? [cid:image002.png@01D2B45F.53169F50] If anyone has any additional ideas about this let me know. I don't know if I have the time to actually create this api so if someone likes the idea and wants to develop it feel free. This e-mail, including attachments, may include confidential and/or proprietary information, and may be used only by the person or entity to which it is addressed. If the reader of this e-mail is not the intended recipient or his or her authorized agent, the reader is hereby notified that any dissemination, distribution or copying of this e-mail is prohibited. If you have received this e-mail in error, please notify the sender by replying to this message and delete this e-mail immediately.