Hello,

I am wondering if there is any way to aggregate together many streams at once 
to build a larger object. Example (Healthcare Domain):
I have streams of Medical, Pharmacy, and Lab claims. Key is PatientId, Value is 
a different Avro Record for each stream.
I was hoping there was a way to supply a single Initializer, () -> new 
Patient(), and 3 aggregators, (key, value, patient) -> 
patient.add******Claim(value).

Currently the only way that I see to do the above use case is by aggregating 
each individual stream then joining them. This doesn't scale well with a large 
number of input streams because for each stream I would be creating another 
state store.

I was hoping to get thoughts on a KCogroupedStream api. I have spent a little 
time conceptualizing it.

Approach 1:
In KGroupedStream add a cogroup method that takes the single initializer, a 
list of other kgroupedstreams, and a list of other aggregators.
This would then all flow through a single processor and a have a single backing 
state store.
The aggregator that the object will get sent to is determined by the 
context().topic() which we should be able to trace back to one of the 
kgroupedstreams in the list.

The problem I am having with this approach is that because everything is going 
through the single processors and java doesn't do the best with generic types. 
I have to either pass in a list of Type objects for casting the object before 
sending it to the aggregator or I must create aggregators that accept an object 
and cast them to the appropriate type.

Approach 2:
Create one processor for each aggregator and have a single state store. Then 
have a single KStreamPassThrough that just passes on the new aggregate value.
The positive for this is you know which stream it will be coming from and won't 
need to do the context().topic() trick.

The problem I am having with this approach is understanding if there is a race 
condition. Obviously the source topics would be copartitioned. But would it be 
multithreaded and possibly cause one of the processors to grab patient 1 at the 
same time a different processor has grabbed patient 1?
My understanding is that for each partition there would be a single complete 
set of processors and a new incoming record would go completely through the 
processor topology from a source node to a sink node before the next one is 
sent through. Is this correct?

[cid:image002.png@01D2B45F.53169F50]

If anyone has any additional ideas about this let me know. I don't know if I 
have the time to actually create this api so if someone likes the idea and 
wants to develop it feel free.

This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.

Reply via email to