Hi Fabian, I'm still eager to expose # of active sessions as a key metric of our service but I haven’t figured it out yet.
First of all, I want to ask you some questions regarding your suggestion. > You could implement a Trigger that fires when a new window is created and > when the window is closed. A ProcessWindowFunction would emit a +1 if the > window was created and a -1 when the window is closes. > Session windows are a bit special, because you also need to handle the case > of merging windows, i.e., two opened windows can be merged and only one (the > merged) window is closed. So would need to emit a -2 if a merged window was > closes (assuming only two windows were merged). Q1) How to fire when a new window is created and when the window is closed? AFAIK, we can return TriggerResult only through the three functions: onElement, onEventTime, and onProcessingTime. Q2) Firing is to emit elements in windows down to the window function, not emitting values like +1, -1 and -2 which are not in windows. Or do I miss something that you meant? > In order to do that, you'd need to carry the merging information forward. The > Trigger.onMerge method cannot trigger the window function, but it could store > the merging information in state that is later accessed. Q3) I didn't understand what you mean at all. What do you mean by carrying the merging information? Besides your suggestion, I implemented a custom trigger which is almost the same as EventTimeTrigger except the followings: - it maintains a variable to count sessions in an instance of a window operator - it increases the variable by 1 when onElement is invoked - it decreases the variable by 1 when onClose is invoked Considering the logic of Flink’s session window, it correctly counts sessions in an instance of a window operator. As you might have already noticed, this approach has a critical problem: there's no way to maintain an operator state inside a trigger. TriggerContext only allows to interact with state that is scoped to the window and the key of the current trigger invocation (as shown in Trigger#TriggerContext) Now I've come to a conclusion that it might not be possible using DataStream API. Otherwise, do I need to think in a totally different way to achieve the goal? Best, - Dongwon > 2018. 2. 20. 오후 6:53, Fabian Hueske <[email protected]> 작성: > > Hi Dongwon Kim, > > That's an interesting question. > > I don't have a solution blueprint for you, but a few ideas that should help > to solve the problem. > > I would start with a separate job first and later try to integrate it with > the other job. > You could implement a Trigger that fires when a new window is created and > when the window is closed. A ProcessWindowFunction would emit a +1 if the > window was created and a -1 when the window is closes. > Session windows are a bit special, because you also need to handle the case > of merging windows, i.e., two opened windows can be merged and only one (the > merged) window is closed. So would need to emit a -2 if a merged window was > closes (assuming only two windows were merged). > In order to do that, you'd need to carry the merging information forward. The > Trigger.onMerge method cannot trigger the window function, but it could store > the merging information in state that is later accessed. > > Hope this helps, > Fabian > > 2018-02-20 9:54 GMT+01:00 Dongwon Kim <[email protected]>: >> Hi, >> >> It could be a totally stupid question but I currently have no idea how to >> get the number of active session windows from a running job. >> >> Our traffic trajectory application (which handles up to 10,000 tps) uses >> event-time session window on KeyedStream (keyed by userID). >> >> Should I write another Flink job for the purpose? >> >> Cheers, >> >> Dongwon Kim
