I am new to Flink so others may have more complete answer or correct me.

If you are counting the events in a tumbling window you will get output at the 
end of each tumbling window, so a running count of events/window.  It sounds 
like you want to compare the raw data to the smoothed data?  You can use a 
CoFlatMap to receive both streams and output any records you like, say a Tuple 
with the raw and smoothed value.  If you use a RichCoFlatMap you can track 
state, so you could keep a list of the last 20 or so raw and smoothed values so 
you can align them.

Michael

> On Apr 10, 2018, at 6:40 PM, Ivan Wang <ivan.wang2...@gmail.com> wrote:
> 
> Hi all,
> 
> I've spent nearly 2 weeks trying to figure a solution to my requirement as 
> below. If anyone can advise, that would be great.
> 
> 1. There're going to be 2000 transactions per second as StreamRaw, I'm going 
> to tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm going 
> to countWindow StreamA as StreamB, let's say every 20 events.
> 
> 2. For every event in  StreamRaw as E, I need to find exact one event in 
> StreamB which is earlier than E and closest to E. Then some comparison will 
> be proceeded. For example, if timestamp in E is 9:46:38, there should be an 
> event in StreamB with timestamp 9:46:30 because I use 15 seconds interval. 
> 
> I tried CEP using StreamRaw, however, I didn't figure out how to involve 
> StreamB and get the exact one event in condition method.
> 
> I tried tableAPI and SQL, it throws time attribute error during the second 
> window method. 
> 
> window(Tumble).group().select().window(Slide).group().select()
> 
> Seems there's no way to tell Flink the time attribute after the first 
> window.group(). I then tried to convert it into table first then leftoutJoin 
> them. But Flink tells me it's not supported.
> 
> Is Flink able to do this? If not, I'll go for other alternatives. Thanks 
> again if someone can help.
> 
> 
> 
> 
> 
> 
> 

Reply via email to