Hi Fabian and Chesnay, Thank you guys.
Fabian : Unfortunately, as Chesnay said, MetricGroup doesn't allow for ProcessWindowFunction to access to a counter defined in Trigger. Chesnay : I'm going to follow your advice on how to modify Flink. Thank you very much! Best, - Dongwon On Thu, Jun 21, 2018 at 10:26 PM, Chesnay Schepler <ches...@apache.org> wrote: > Without modifications to Flink? No. By design nothing can intercept or > retrieve metrics with the metrics API. > For this pattern the usual recommendation is to explicitly pass the metric > to components that require it. > > If modifications are an option, what you could do is > * define a counter in the OperatorIOMetricGroup > * have the operator checkpoint/restore the counter, > * access it in the trigger by casting your way through the MetricGroups to > an OperatorMetricGroup from which you can retrieve the > OperatorIOMetricGroup. > > > > On 21.06.2018 11:16, Fabian Hueske wrote: > > Hi Dongwon, > > Yes, the counter state should be stored in operator state which is not > available on Triggers. > Chesnay: Can a window function (like ProcessWindowFunction) access (read, > write) the counter of its associated Trigger to checkpoint and restore it? > > Best, Fabian > > 2018-06-20 16:59 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>: > >> Hi Fabian and Chesnay, >> >> As Chesnay pointed out, it seems that I need to write the current counter >> (which is defined inside Trigger) into state which I think should be the >> operator state of the window operator. >> However, as I previously said, TriggerContext allows for users to access >> only the partitioned state that are scoped to *the key and* *the window* of >> the current Trigger invocation. >> There's no way for me to access to the operator state of the window >> operator through TriggerContext. >> The partitioned state doesn't seem suitable as we have more than *ten >> million keys*. >> This amount of keys could possibly break down the metric system and the >> external metric systems like Ganglia and Prometheus. >> >> What I want the most is to achieve the goal using the current API (I'm >> using Flink-1.4.2) without modification. >> But a change in TriggerContext seems unavoidable because it has to expose >> an additional method for users like me to access to the operator state of >> the window operator. >> >> Thank you guys for the useful discussion. >> >> p.s. Fabian, yes you're right. It is Trigger.clear(), not >> Trigger.onClose(). >> >> Best, >> - Dongwon >> >> >> On Wed, Jun 20, 2018 at 7:30 PM, Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> Checkpointing of metrics is a manual process. >>> The operator must write the current value into state, retrieve it on >>> restore and restore the counter's count. >>> >>> >>> On 20.06.2018 12:10, Fabian Hueske wrote: >>> >>> Hi Dongwon, >>> >>> You are of course right! We need to decrement the counter when the >>> window is closed. >>> >>> The idea of using Trigger.clear() (the clean up method is called clear() >>> instead of onClose()) method is great! >>> It will be called when the window is closed but also when it is merged. >>> So, I think you are right and we only need to increment the counter in >>> Trigger.onElement() and decrement in Trigger.clear(). >>> >>> I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay >>> (in CC) would know that. >>> Not sure what would be the best approach if you need a fault tolerant >>> solution. >>> >>> Best, Fabian >>> >>> >>> >>> >>> 2018-06-19 16:38 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>: >>> >>>> Hi Fabian, >>>> Thanks a lot for your reply. >>>> >>>> Do you need to number of active session windows as a DataStream or >>>>> would you like to have it as a metric that you can expose. >>>>> I possible, I would recommend to expose it as a metric because they >>>>> are usually easier to collect. >>>> >>>> I want to have it as a metric and it doesn't look difficult thanks to >>>> the metric system exposed by TriggerContext. >>>> >>>> In order to track how many session windows exist, we would need to >>>>> increment a counter by one when a new window is created (or an element is >>>>> assigned to a window, which is equivalent for session windows) >>>> >>>> I agree with you that we need to increment a counter when >>>> Trigger.onElement() is called due to the characteristic of session windows. >>>> >>>> and decrement the counter when windows are merged by the number of >>>>> merged windows minus one. >>>> >>>> You decrement the counter when windows are merged, but I think we need >>>> to decrement the counter when a window is expired as well. >>>> >>>> However, decrementing the counter is difficult. Although the >>>>> Trigger.onMerge() method is called, it does not know how many windows were >>>>> merged (which is done by the WindowAssigner) and only sees the merged >>>>> window. >>>> >>>> We assume that timestamps of records from a user are in ascending >>>> order, so only one window is closed at a time which simplifies the problem >>>> of how to decrement the counter. >>>> Nevertheless, I think I need to decrement the counter in >>>> Trigger.onClose(), not Trigger.onMerge(). >>>> By doing that in Trigger.onClose(), we can take care of both cases: >>>> when a window is merged and when a window is expired. >>>> How do you think about it? >>>> >>>> The reason I mention state is to calculate the exact number of active >>>> sessions even after my Flink application is restarted from checkpoints or >>>> savepoints. >>>> If we restore from a savepoint and the counter is initialized to 0, >>>> we'll see an incorrect value from a dashboard. >>>> This is the biggest concern of mine at this point. >>>> >>>> Best, >>>> >>>> - Dongwon >>>> >>>> >>>> On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Dongwon, >>>>> >>>>> Do you need to number of active session windows as a DataStream or >>>>> would you like to have it as a metric that you can expose. >>>>> I possible, I would recommend to expose it as a metric because they >>>>> are usually easier to collect. >>>>> >>>>> SessionWindows work internally as follows: >>>>> - every new record is added to a new window that starts at the >>>>> timestamp of the record and ends at timestamp + gap size. When a record is >>>>> added to a window, Trigger.onElement() is called. >>>>> - after a window was created, the session window assigner tries to >>>>> merge window with overlapping ranges. When windows are merged, >>>>> Trigger.onMerge() is called. >>>>> >>>>> In order to track how many session windows exist, we would need to >>>>> increment a counter by one when a new window is created (or an element is >>>>> assigned to a window, which is equivalent for session windows) and >>>>> decrement the counter when windows are merged by the number of merged >>>>> windows minus one. >>>>> >>>>> Incrementing the counter is rather easy and can be done in >>>>> Trigger.onElement(), either by using state or a Counter metric (Triggers >>>>> have access to the metric system). >>>>> However, decrementing the counter is difficult. Although the >>>>> Trigger.onMerge() method is called, it does not know how many windows were >>>>> merged (which is done by the WindowAssigner) and only sees the merged >>>>> window. There might be a way to maintain state in a Trigger that allows to >>>>> infer how many windows were merged. >>>>> >>>>> Best, Fabian >>>>> >>>>> 2018-06-16 16:39 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>: >>>>> >>>>>> Hi Fabian, >>>>>> >>>>>> I'm still eager to expose # of active sessions as a key metric of our >>>>>> service but I haven’t figured it out yet. >>>>>> >>>>>> First of all, I want to ask you some questions regarding your >>>>>> suggestion. >>>>>> >>>>>> You could implement a Trigger that fires when a new window is created >>>>>> and when the window is closed. A ProcessWindowFunction would emit a +1 if >>>>>> the window was created and a -1 when the window is closes. >>>>>> Session windows are a bit special, because you also need to handle >>>>>> the case of merging windows, i.e., two opened windows can be merged and >>>>>> only one (the merged) window is closed. So would need to emit a -2 if a >>>>>> merged window was closes (assuming only two windows were merged). >>>>>> >>>>>> Q1) >>>>>> How to fire when a new window is created and when the window is >>>>>> closed? >>>>>> AFAIK, we can return TriggerResult only through the three functions: >>>>>> onElement, onEventTime, and onProcessingTime. >>>>>> Q2) >>>>>> Firing is to emit elements in windows down to the window function, >>>>>> not emitting values like +1, -1 and -2 which are not in windows. >>>>>> Or do I miss something that you meant? >>>>>> >>>>>> In order to do that, you'd need to carry the merging information >>>>>> forward. The Trigger.onMerge method cannot trigger the window function, >>>>>> but >>>>>> it could store the merging information in state that is later accessed. >>>>>> >>>>>> Q3) >>>>>> I didn't understand what you mean at all. What do you mean by >>>>>> carrying the merging information? >>>>>> >>>>>> Besides your suggestion, I implemented a custom trigger which is >>>>>> almost the same as EventTimeTrigger except the followings: >>>>>> - it maintains a variable to count sessions in an instance of a >>>>>> window operator >>>>>> - it increases the variable by 1 when onElement is invoked >>>>>> - it decreases the variable by 1 when onClose is invoked >>>>>> Considering the logic of Flink’s session window, it correctly counts >>>>>> sessions in an instance of a window operator. >>>>>> >>>>>> As you might have already noticed, this approach has a critical >>>>>> problem: there's no way to maintain an operator state inside a >>>>>> trigger. >>>>>> TriggerContext only allows to interact with state that is scoped to >>>>>> the window and the key of the current trigger invocation (as shown in >>>>>> Trigger#TriggerContext) >>>>>> >>>>>> Now I've come to a conclusion that it might not be possible using >>>>>> DataStream API. >>>>>> Otherwise, do I need to think in a totally different way to achieve >>>>>> the goal? >>>>>> >>>>>> Best, >>>>>> >>>>>> - Dongwon >>>>>> >>>>>> >>>>>> >>>>>> 2018. 2. 20. 오후 6:53, Fabian Hueske <fhue...@gmail.com> 작성: >>>>>> >>>>>> Hi Dongwon Kim, >>>>>> >>>>>> That's an interesting question. >>>>>> >>>>>> I don't have a solution blueprint for you, but a few ideas that >>>>>> should help to solve the problem. >>>>>> >>>>>> I would start with a separate job first and later try to integrate it >>>>>> with the other job. >>>>>> You could implement a Trigger that fires when a new window is created >>>>>> and when the window is closed. A ProcessWindowFunction would emit a +1 if >>>>>> the window was created and a -1 when the window is closes. >>>>>> Session windows are a bit special, because you also need to handle >>>>>> the case of merging windows, i.e., two opened windows can be merged and >>>>>> only one (the merged) window is closed. So would need to emit a -2 if a >>>>>> merged window was closes (assuming only two windows were merged). >>>>>> In order to do that, you'd need to carry the merging information >>>>>> forward. The Trigger.onMerge method cannot trigger the window function, >>>>>> but >>>>>> it could store the merging information in state that is later accessed. >>>>>> >>>>>> Hope this helps, >>>>>> Fabian >>>>>> >>>>>> 2018-02-20 9:54 GMT+01:00 Dongwon Kim <eastcirc...@gmail.com>: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> It could be a totally stupid question but I currently have no idea >>>>>>> how to get the number of active session windows from a running job. >>>>>>> >>>>>>> Our traffic trajectory application (which handles up to 10,000 tps) >>>>>>> uses event-time session window on KeyedStream (keyed by userID). >>>>>>> >>>>>>> Should I write another Flink job for the purpose? >>>>>>> >>>>>>> Cheers, >>>>>>> >>>>>>> Dongwon Kim >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >>> >> > >