Hi Dongwon,
You are of course right! We need to decrement the counter
when the window is closed.
The idea of using Trigger.clear() (the clean up method is
called clear() instead of onClose()) method is great!
It will be called when the window is closed but also when it
is merged.
So, I think you are right and we only need to increment the
counter in Trigger.onElement() and decrement in Trigger.clear().
I'm not 100% sure, but I doubt that metrics can checkpointed.
Chesnay (in CC) would know that.
Not sure what would be the best approach if you need a fault
tolerant solution.
Best, Fabian
2018-06-19 16:38 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com
<mailto:eastcirc...@gmail.com>>:
Hi Fabian,
Thanks a lot for your reply.
Do you need to number of active session windows as a
DataStream or would you like to have it as a metric
that you can expose.
I possible, I would recommend to expose it as a
metric because they are usually easier to collect.
I want to have it as a metric and it doesn't look
difficult thanks to the metric system exposed by
TriggerContext.
In order to track how many session windows exist, we
would need to increment a counter by one when a new
window is created (or an element is assigned to a
window, which is equivalent for session windows)
I agree with you that we need to increment a counter when
Trigger.onElement() is called due to the characteristic
of session windows.
and decrement the counter when windows are merged by
the number of merged windows minus one.
You decrement the counter when windows are merged, but I
think we need to decrement the counter when a window is
expired as well.
However, decrementing the counter is difficult.
Although the Trigger.onMerge() method is called, it
does not know how many windows were merged (which is
done by the WindowAssigner) and only sees the merged
window.
We assume that timestamps of records from a user are in
ascending order, so only one window is closed at a time
which simplifies the problem of how to decrement the counter.
Nevertheless, I think I need to decrement the counter in
Trigger.onClose(), not Trigger.onMerge().
By doing that in Trigger.onClose(), we can take care of
both cases: when a window is merged and when a window is
expired.
How do you think about it?
The reason I mention state is to calculate the exact
number of active sessions even after my Flink application
is restarted from checkpoints or savepoints.
If we restore from a savepoint and the counter is
initialized to 0, we'll see an incorrect value from a
dashboard.
This is the biggest concern of mine at this point.
Best,
- Dongwon
On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske
<fhue...@gmail.com <mailto:fhue...@gmail.com>> wrote:
Hi Dongwon,
Do you need to number of active session windows as a
DataStream or would you like to have it as a metric
that you can expose.
I possible, I would recommend to expose it as a
metric because they are usually easier to collect.
SessionWindows work internally as follows:
- every new record is added to a new window that
starts at the timestamp of the record and ends at
timestamp + gap size. When a record is added to a
window, Trigger.onElement() is called.
- after a window was created, the session window
assigner tries to merge window with overlapping
ranges. When windows are merged, Trigger.onMerge() is
called.
In order to track how many session windows exist, we
would need to increment a counter by one when a new
window is created (or an element is assigned to a
window, which is equivalent for session windows) and
decrement the counter when windows are merged by the
number of merged windows minus one.
Incrementing the counter is rather easy and can be
done in Trigger.onElement(), either by using state or
a Counter metric (Triggers have access to the metric
system).
However, decrementing the counter is difficult.
Although the Trigger.onMerge() method is called, it
does not know how many windows were merged (which is
done by the WindowAssigner) and only sees the merged
window. There might be a way to maintain state in a
Trigger that allows to infer how many windows were
merged.
Best, Fabian
2018-06-16 16:39 GMT+02:00 Dongwon Kim
<eastcirc...@gmail.com <mailto:eastcirc...@gmail.com>>:
Hi Fabian,
I'm still eager to expose # of active sessions as
a key metric of our service but I haven’t figured
it out yet.
First of all, I want to ask you some questions
regarding your suggestion.
You could implement a Trigger that fires when a
new window is created and when the window is
closed. A ProcessWindowFunction would emit a +1
if the window was created and a -1 when the
window is closes.
Session windows are a bit special, because you
also need to handle the case of merging windows,
i.e., two opened windows can be merged and only
one (the merged) window is closed. So would need
to emit a -2 if a merged window was closes
(assuming only two windows were merged).
Q1)
How to fire when a new window is created and when
the window is closed?
AFAIK, we can return TriggerResult only through
the three functions: onElement, onEventTime, and
onProcessingTime.
Q2)
Firing is to emit elements in windows down to the
window function, not emitting values like +1, -1
and -2 which are not in windows.
Or do I miss something that you meant?
In order to do that, you'd need to carry the
merging information forward. The Trigger.onMerge
method cannot trigger the window function, but
it could store the merging information in state
that is later accessed.
Q3)
I didn't understand what you mean at all. What do
you mean by carrying the merging information?
Besides your suggestion, I implemented a custom
trigger which is almost the same as
EventTimeTrigger except the followings:
- it maintains a variable to count sessions in an
instance of a window operator
- it increases the variable by 1 when onElement
is invoked
- it decreases the variable by 1 when onClose is
invoked
Considering the logic of Flink’s session window,
it correctly counts sessions in an instance of a
window operator.
As you might have already noticed, this approach
has a critical problem: there's no way to
maintain an operator state inside a trigger.
TriggerContext only allows to interact with state
that is scoped to the window and the key of the
current trigger invocation (as shown in
Trigger#TriggerContext)
Now I've come to a conclusion that it might not
be possible using DataStream API.
Otherwise, do I need to think in a totally
different way to achieve the goal?
Best,
- Dongwon
2018. 2. 20. 오후 6:53, Fabian Hueske
<fhue...@gmail.com <mailto:fhue...@gmail.com>> 작성:
Hi Dongwon Kim,
That's an interesting question.
I don't have a solution blueprint for you, but a
few ideas that should help to solve the problem.
I would start with a separate job first and
later try to integrate it with the other job.
You could implement a Trigger that fires when a
new window is created and when the window is
closed. A ProcessWindowFunction would emit a +1
if the window was created and a -1 when the
window is closes.
Session windows are a bit special, because you
also need to handle the case of merging windows,
i.e., two opened windows can be merged and only
one (the merged) window is closed. So would need
to emit a -2 if a merged window was closes
(assuming only two windows were merged).
In order to do that, you'd need to carry the
merging information forward. The Trigger.onMerge
method cannot trigger the window function, but
it could store the merging information in state
that is later accessed.
Hope this helps,
Fabian
2018-02-20 9:54 GMT+01:00 Dongwon Kim
<eastcirc...@gmail.com
<mailto:eastcirc...@gmail.com>>:
Hi,
It could be a totally stupid question but I
currently have no idea how to get the number
of active session windows from a running job.
Our traffic trajectory application (which
handles up to 10,000 tps) uses event-time
session window on KeyedStream (keyed by userID).
Should I write another Flink job for the
purpose?
Cheers,
Dongwon Kim