I am new to Flink so others may have more complete answer or correct me. If you are counting the events in a tumbling window you will get output at the end of each tumbling window, so a running count of events/window. It sounds like you want to compare the raw data to the smoothed data? You can use a CoFlatMap to receive both streams and output any records you like, say a Tuple with the raw and smoothed value. If you use a RichCoFlatMap you can track state, so you could keep a list of the last 20 or so raw and smoothed values so you can align them.
Michael > On Apr 10, 2018, at 6:40 PM, Ivan Wang <ivan.wang2...@gmail.com> wrote: > > Hi all, > > I've spent nearly 2 weeks trying to figure a solution to my requirement as > below. If anyone can advise, that would be great. > > 1. There're going to be 2000 transactions per second as StreamRaw, I'm going > to tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm going > to countWindow StreamA as StreamB, let's say every 20 events. > > 2. For every event in StreamRaw as E, I need to find exact one event in > StreamB which is earlier than E and closest to E. Then some comparison will > be proceeded. For example, if timestamp in E is 9:46:38, there should be an > event in StreamB with timestamp 9:46:30 because I use 15 seconds interval. > > I tried CEP using StreamRaw, however, I didn't figure out how to involve > StreamB and get the exact one event in condition method. > > I tried tableAPI and SQL, it throws time attribute error during the second > window method. > > window(Tumble).group().select().window(Slide).group().select() > > Seems there's no way to tell Flink the time attribute after the first > window.group(). I then tried to convert it into table first then leftoutJoin > them. But Flink tells me it's not supported. > > Is Flink able to do this? If not, I'll go for other alternatives. Thanks > again if someone can help. > > > > > > >