I was trying to join two keyed streams in a particular way and get a combined
stream.
For example:
Lets say I call the two streams as X and Y. 
The X stream contains:

(Key,Value)

(A,P)
(A,Q)
(A,R)
(B,P)
(C,P)
(C,Q)

The Y stream contains:

(Key,Value,Flag1,Flag2)

(A,M1,0,0)
(A,M2,0,0)
(A,M3,1,0)
(A,M4,0,0)
(A,M5,1,0)
(A,M6,0,1)
(B,N1,0,0)
(B,N2,1,0)
(B,N3,0,1)
(C,O1,1,0)
(C,O2,0,1)

My objective is to join these two streams and get the combined value as
described. I want a keywise aggregated data of "Value" field from the X
stream. In the Y stream I want a keywise aggregation of "Value" field based
on "Flag1" i.e., the output will be set of aggregated values. I want to join
these two streams by maintaining a keyed window and that window gets
triggered only when the "Flag2" value of a particular key in the Y stream is
"1". These flag values are available only with Y stream and not with the X
stream. Thus my end result should look like:

(Key,Value1,Value2)

(A,(P#Q#R),[(M1#M2#M4#M6),(M3#M5)])
(B,P,[(N1#N3),(N2)])
(C,(P#Q),[(O1),(O2)])

The timings for each of the rows in each stream are such that by the time
the Flag2 value is 1 in stream Y (indicates some sort of end of a session),
all the rows in stream X are also already available.

I tried to maintains state value inside my join function to get to the
output. But I dont know how to query the state value and when to do it. Can
anyone please suggest some solution?





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Joining-two-aggregated-streams-tp14123.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to