Hi Dominique, your problem sounds like a good use case for session windows [1, 2]. If you know that there is only a maximum gap between your request and response message, then you could create a session window via:
input .keyBy("ReqRespID") .window(EventTimeSessionWindows.withGap(Time.minutes(MaxTimeBetweenReqResp))) .<windowed transformation>(/* calculate time */); [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#session-windows [2] http://data-artisans.com/session-windowing-in-flink/ Cheers, Till On Tue, Jul 19, 2016 at 7:04 PM, Sameer W <sam...@axiomine.com> wrote: > How about using EventTime windows with watermark assignment and bounded > delays. That way you allow more than 5 minutes (bounded delay) for your > request and responses to arrive. Do you have a way to assign timestamp to > the responses based on the request timestamp (does the response contain the > request timestamp in some form). That way you add them to the same window. > > Sameer > > On Tue, Jul 19, 2016 at 12:31 PM, Dominique Rondé < > dominique.ro...@allsecur.de> wrote: > >> Hi all, >> >> once again I need a "kick" to the right direction. I have a datastream >> with request and responses identified by an ReqResp-ID. I like to calculate >> the (avg, 95%, 99%) time between the request and response and also like to >> count them. I thought of >> ".keyBy("ReqRespID").timeWindowAll(Time.minutes(5)).apply(function)" would >> do the job, but there are some cases were a Request is in the first and the >> Response is in the second window. But if i use a overlapping time window >> (i.e. timeWindowAll(Time.minutes(5),Time.seconds(60))) I have a lot of >> requests more then one time in the apply-function. >> >> Do you have any hint for me? >> >> Thanks a lot! >> >> Dominique >> >> >