Hi Fabian,

I'm still eager to expose # of active sessions as a key metric of our service 
but I haven’t figured it out yet.

First of all, I want to ask you some questions regarding your suggestion.
> You could implement a Trigger that fires when a new window is created and 
> when the window is closed. A ProcessWindowFunction would emit a +1 if the 
> window was created and a -1 when the window is closes.
> Session windows are a bit special, because you also need to handle the case 
> of merging windows, i.e., two opened windows can be merged and only one (the 
> merged) window is closed. So would need to emit a -2 if a merged window was 
> closes (assuming only two windows were merged).

Q1) 
How to fire when a new window is created and when the window is closed?
AFAIK, we can return TriggerResult only through the three functions: onElement, 
onEventTime, and onProcessingTime.
Q2)
Firing is to emit elements in windows down to the window function, not emitting 
values like +1, -1 and -2 which are not in windows.
Or do I miss something that you meant?
> In order to do that, you'd need to carry the merging information forward. The 
> Trigger.onMerge method cannot trigger the window function, but it could store 
> the merging information in state that is later accessed.

Q3) 
I didn't understand what you mean at all. What do you mean by carrying the 
merging information?

Besides your suggestion, I implemented a custom trigger which is almost the 
same as EventTimeTrigger except the followings:
- it maintains a variable to count sessions in an instance of a window operator
- it increases the variable by 1 when onElement is invoked
- it decreases the variable by 1 when onClose is invoked
Considering the logic of Flink’s session window, it correctly counts sessions 
in an instance of a window operator. 

As you might have already noticed, this approach has a critical problem: 
there's no way to maintain an operator state inside a trigger. 
TriggerContext only allows to interact with state that is scoped to the window 
and the key of the current trigger invocation (as shown in 
Trigger#TriggerContext)

Now I've come to a conclusion that it might not be possible using DataStream 
API.
Otherwise, do I need to think in a totally different way to achieve the goal?

Best,

- Dongwon



> 2018. 2. 20. 오후 6:53, Fabian Hueske <[email protected]> 작성:
> 
> Hi Dongwon Kim,
> 
> That's an interesting question. 
> 
> I don't have a solution blueprint for you, but a few ideas that should help 
> to solve the problem.
> 
> I would start with a separate job first and later try to integrate it with 
> the other job.
> You could implement a Trigger that fires when a new window is created and 
> when the window is closed. A ProcessWindowFunction would emit a +1 if the 
> window was created and a -1 when the window is closes.
> Session windows are a bit special, because you also need to handle the case 
> of merging windows, i.e., two opened windows can be merged and only one (the 
> merged) window is closed. So would need to emit a -2 if a merged window was 
> closes (assuming only two windows were merged).
> In order to do that, you'd need to carry the merging information forward. The 
> Trigger.onMerge method cannot trigger the window function, but it could store 
> the merging information in state that is later accessed.
> 
> Hope this helps,
> Fabian
> 
> 2018-02-20 9:54 GMT+01:00 Dongwon Kim <[email protected]>:
>> Hi,
>> 
>> It could be a totally stupid question but I currently have no idea how to 
>> get the number of active session windows from a running job.
>> 
>> Our traffic trajectory application (which handles up to 10,000 tps) uses 
>> event-time session window on KeyedStream (keyed by userID).
>> 
>> Should I write another Flink job for the purpose?
>> 
>> Cheers,
>> 
>> Dongwon Kim

Reply via email to