Without modifications to Flink? No. By design nothing can intercept or retrieve metrics with the metrics API. For this pattern the usual recommendation is to explicitly pass the metric to components that require it.

If modifications are an option, what you could do is
* define a counter in the OperatorIOMetricGroup
* have the operator checkpoint/restore the counter,
* access it in the trigger by casting your way through the MetricGroups to an OperatorMetricGroup from which you can retrieve the OperatorIOMetricGroup.


On 21.06.2018 11:16, Fabian Hueske wrote:
Hi Dongwon,

Yes, the counter state should be stored in operator state which is not available on Triggers. Chesnay: Can a window function (like ProcessWindowFunction) access (read, write) the counter of its associated Trigger to checkpoint and restore it?

Best, Fabian

2018-06-20 16:59 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com <mailto:eastcirc...@gmail.com>>:

    Hi Fabian and Chesnay,

    As Chesnay pointed out, it seems that I need to write the current
    counter (which is defined inside Trigger) into state which I think
    should be the operator state of the window operator.
    However, as I previously said, TriggerContext allows for users to
    access only the partitioned state that are scoped to *the key and*
    *the window* of the current Trigger invocation.
    There's no way for me to access to the operator state of the
    window operator through TriggerContext.
    The partitioned state doesn't seem suitable as we have more than
    *ten million keys*.
    This amount of keys could possibly break down the metric system
    and the external metric systems like Ganglia and Prometheus.

    What I want the most is to achieve the goal using the current API
    (I'm using Flink-1.4.2) without modification.
    But a change in TriggerContext seems unavoidable because it has to
    expose an additional method for users like me to access to the
    operator state of the window operator.

    Thank you guys for the useful discussion.

    p.s. Fabian, yes you're right. It is Trigger.clear(), not
    Trigger.onClose().

    Best,
    - Dongwon


    On Wed, Jun 20, 2018 at 7:30 PM, Chesnay Schepler
    <ches...@apache.org <mailto:ches...@apache.org>> wrote:

        Checkpointing of metrics is a manual process.
        The operator must write the current value into state, retrieve
        it on restore and restore the counter's count.


        On 20.06.2018 12:10, Fabian Hueske wrote:
        Hi Dongwon,

        You are of course right! We need to decrement the counter
        when the window is closed.

        The idea of using Trigger.clear() (the clean up method is
        called clear() instead of onClose()) method is great!
        It will be called when the window is closed but also when it
        is merged.
        So, I think you are right and we only need to increment the
        counter in Trigger.onElement() and decrement in Trigger.clear().

        I'm not 100% sure, but I doubt that metrics can checkpointed.
        Chesnay (in CC) would know that.
        Not sure what would be the best approach if you need a fault
        tolerant solution.

        Best, Fabian




        2018-06-19 16:38 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com
        <mailto:eastcirc...@gmail.com>>:

            Hi Fabian,
            Thanks a lot for your reply.

                Do you need to number of active session windows as a
                DataStream or would you like to have it as a metric
                that you can expose.
                I possible, I would recommend to expose it as a
                metric because they are usually easier to collect.

            I want to have it as a metric and it doesn't look
            difficult thanks to the metric system exposed by
            TriggerContext.

                In order to track how many session windows exist, we
                would need to increment a counter by one when a new
                window is created (or an element is assigned to a
                window, which is equivalent for session windows)

            I agree with you that we need to increment a counter when
            Trigger.onElement() is called due to the characteristic
            of session windows.

                and decrement the counter when windows are merged by
                the number of merged windows minus one.

            You decrement the counter when windows are merged, but I
            think we need to decrement the counter when a window is
            expired as well.

                However, decrementing the counter is difficult.
                Although the Trigger.onMerge() method is called, it
                does not know how many windows were merged (which is
                done by the WindowAssigner) and only sees the merged
                window.

            We assume that timestamps of records from a user are in
            ascending order, so only one window is closed at a time
            which simplifies the problem of how to decrement the counter.
            Nevertheless, I think I need to decrement the counter in
            Trigger.onClose(), not Trigger.onMerge().
            By doing that in Trigger.onClose(), we can take care of
            both cases: when a window is merged and when a window is
            expired.
            How do you think about it?

            The reason I mention state is to calculate the exact
            number of active sessions even after my Flink application
            is restarted from checkpoints or savepoints.
            If we restore from a savepoint and the counter is
            initialized to 0, we'll see an incorrect value from a
            dashboard.
            This is the biggest concern of mine at this point.

            Best,

            - Dongwon


            On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske
            <fhue...@gmail.com <mailto:fhue...@gmail.com>> wrote:

                Hi Dongwon,

                Do you need to number of active session windows as a
                DataStream or would you like to have it as a metric
                that you can expose.
                I possible, I would recommend to expose it as a
                metric because they are usually easier to collect.

                SessionWindows work internally as follows:
                - every new record is added to a new window that
                starts at the timestamp of the record and ends at
                timestamp + gap size. When a record is added to a
                window, Trigger.onElement() is called.
                - after a window was created, the session window
                assigner tries to merge window with overlapping
                ranges. When windows are merged, Trigger.onMerge() is
                called.

                In order to track how many session windows exist, we
                would need to increment a counter by one when a new
                window is created (or an element is assigned to a
                window, which is equivalent for session windows) and
                decrement the counter when windows are merged by the
                number of merged windows minus one.

                Incrementing the counter is rather easy and can be
                done in Trigger.onElement(), either by using state or
                a Counter metric (Triggers have access to the metric
                system).
                However, decrementing the counter is difficult.
                Although the Trigger.onMerge() method is called, it
                does not know how many windows were merged (which is
                done by the WindowAssigner) and only sees the merged
                window. There might be a way to maintain state in a
                Trigger that allows to infer how many windows were
                merged.

                Best, Fabian

                2018-06-16 16:39 GMT+02:00 Dongwon Kim
                <eastcirc...@gmail.com <mailto:eastcirc...@gmail.com>>:

                    Hi Fabian,

                    I'm still eager to expose # of active sessions as
                    a key metric of our service but I haven’t figured
                    it out yet.

                    First of all, I want to ask you some questions
                    regarding your suggestion.
                    You could implement a Trigger that fires when a
                    new window is created and when the window is
                    closed. A ProcessWindowFunction would emit a +1
                    if the window was created and a -1 when the
                    window is closes.
                    Session windows are a bit special, because you
                    also need to handle the case of merging windows,
                    i.e., two opened windows can be merged and only
                    one (the merged) window is closed. So would need
                    to emit a -2 if a merged window was closes
                    (assuming only two windows were merged).
                    Q1)
                    How to fire when a new window is created and when
                    the window is closed?
                    AFAIK, we can return TriggerResult only through
                    the three functions: onElement, onEventTime, and
                    onProcessingTime.
                    Q2)
                    Firing is to emit elements in windows down to the
                    window function, not emitting values like +1, -1
                    and -2 which are not in windows.
                    Or do I miss something that you meant?
                    In order to do that, you'd need to carry the
                    merging information forward. The Trigger.onMerge
                    method cannot trigger the window function, but
                    it could store the merging information in state
                    that is later accessed.
                    Q3)
                    I didn't understand what you mean at all. What do
                    you mean by carrying the merging information?

                    Besides your suggestion, I implemented a custom
                    trigger which is almost the same as
                    EventTimeTrigger except the followings:
                    - it maintains a variable to count sessions in an
                    instance of a window operator
                    - it increases the variable by 1 when onElement
                    is invoked
                    - it decreases the variable by 1 when onClose is
                    invoked
                    Considering the logic of Flink’s session window,
                    it correctly counts sessions in an instance of a
                    window operator.

                    As you might have already noticed, this approach
                    has a critical problem: there's no way to
                    maintain an operator state inside a trigger.
                    TriggerContext only allows to interact with state
                    that is scoped to the window and the key of the
                    current trigger invocation (as shown in
                    Trigger#TriggerContext)

                    Now I've come to a conclusion that it might not
                    be possible using DataStream API.
                    Otherwise, do I need to think in a totally
                    different way to achieve the goal?

                    Best,

                    - Dongwon



                    2018. 2. 20. 오후 6:53, Fabian Hueske
                    <fhue...@gmail.com <mailto:fhue...@gmail.com>> 작성:

                    Hi Dongwon Kim,

                    That's an interesting question.

                    I don't have a solution blueprint for you, but a
                    few ideas that should help to solve the problem.

                    I would start with a separate job first and
                    later try to integrate it with the other job.
                    You could implement a Trigger that fires when a
                    new window is created and when the window is
                    closed. A ProcessWindowFunction would emit a +1
                    if the window was created and a -1 when the
                    window is closes.
                    Session windows are a bit special, because you
                    also need to handle the case of merging windows,
                    i.e., two opened windows can be merged and only
                    one (the merged) window is closed. So would need
                    to emit a -2 if a merged window was closes
                    (assuming only two windows were merged).
                    In order to do that, you'd need to carry the
                    merging information forward. The Trigger.onMerge
                    method cannot trigger the window function, but
                    it could store the merging information in state
                    that is later accessed.

                    Hope this helps,
                    Fabian

                    2018-02-20 9:54 GMT+01:00 Dongwon Kim
                    <eastcirc...@gmail.com
                    <mailto:eastcirc...@gmail.com>>:

                        Hi,

                        It could be a totally stupid question but I
                        currently have no idea how to get the number
                        of active session windows from a running job.

                        Our traffic trajectory application (which
                        handles up to 10,000 tps) uses event-time
                        session window on KeyedStream (keyed by userID).

                        Should I write another Flink job for the
                        purpose?

                        Cheers,

                        Dongwon Kim










Reply via email to