Checkpointing of metrics is a manual process.
The operator must write the current value into state, retrieve it on restore and restore the counter's count.

On 20.06.2018 12:10, Fabian Hueske wrote:
Hi Dongwon,

You are of course right! We need to decrement the counter when the window is closed.

The idea of using Trigger.clear() (the clean up method is called clear() instead of onClose()) method is great!
It will be called when the window is closed but also when it is merged.
So, I think you are right and we only need to increment the counter in Trigger.onElement() and decrement in Trigger.clear().

I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay (in CC) would know that. Not sure what would be the best approach if you need a fault tolerant solution.

Best, Fabian




2018-06-19 16:38 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com <mailto:eastcirc...@gmail.com>>:

    Hi Fabian,
    Thanks a lot for your reply.

        Do you need to number of active session windows as a
        DataStream or would you like to have it as a metric that you
        can expose.
        I possible, I would recommend to expose it as a metric because
        they are usually easier to collect.

    I want to have it as a metric and it doesn't look difficult thanks
    to the metric system exposed by TriggerContext.

        In order to track how many session windows exist, we would
        need to increment a counter by one when a new window is
        created (or an element is assigned to a window, which is
        equivalent for session windows)

    I agree with you that we need to increment a counter when
    Trigger.onElement() is called due to the characteristic of session
    windows.

        and decrement the counter when windows are merged by the
        number of merged windows minus one.

    You decrement the counter when windows are merged, but I think we
    need to decrement the counter when a window is expired as well.

        However, decrementing the counter is difficult. Although the
        Trigger.onMerge() method is called, it does not know how many
        windows were merged (which is done by the WindowAssigner) and
        only sees the merged window.

    We assume that timestamps of records from a user are in ascending
    order, so only one window is closed at a time which simplifies the
    problem of how to decrement the counter.
    Nevertheless, I think I need to decrement the counter in
    Trigger.onClose(), not Trigger.onMerge().
    By doing that in Trigger.onClose(), we can take care of both
    cases: when a window is merged and when a window is expired.
    How do you think about it?

    The reason I mention state is to calculate the exact number of
    active sessions even after my Flink application is restarted from
    checkpoints or savepoints.
    If we restore from a savepoint and the counter is initialized to
    0, we'll see an incorrect value from a dashboard.
    This is the biggest concern of mine at this point.

    Best,

    - Dongwon


    On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <fhue...@gmail.com
    <mailto:fhue...@gmail.com>> wrote:

        Hi Dongwon,

        Do you need to number of active session windows as a
        DataStream or would you like to have it as a metric that you
        can expose.
        I possible, I would recommend to expose it as a metric because
        they are usually easier to collect.

        SessionWindows work internally as follows:
        - every new record is added to a new window that starts at the
        timestamp of the record and ends at timestamp + gap size. When
        a record is added to a window, Trigger.onElement() is called.
        - after a window was created, the session window assigner
        tries to merge window with overlapping ranges. When windows
        are merged, Trigger.onMerge() is called.

        In order to track how many session windows exist, we would
        need to increment a counter by one when a new window is
        created (or an element is assigned to a window, which is
        equivalent for session windows) and decrement the counter when
        windows are merged by the number of merged windows minus one.

        Incrementing the counter is rather easy and can be done in
        Trigger.onElement(), either by using state or a Counter metric
        (Triggers have access to the metric system).
        However, decrementing the counter is difficult. Although the
        Trigger.onMerge() method is called, it does not know how many
        windows were merged (which is done by the WindowAssigner) and
        only sees the merged window. There might be a way to maintain
        state in a Trigger that allows to infer how many windows were
        merged.

        Best, Fabian

        2018-06-16 16:39 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com
        <mailto:eastcirc...@gmail.com>>:

            Hi Fabian,

            I'm still eager to expose # of active sessions as a key
            metric of our service but I haven’t figured it out yet.

            First of all, I want to ask you some questions regarding
            your suggestion.
            You could implement a Trigger that fires when a new
            window is created and when the window is closed. A
            ProcessWindowFunction would emit a +1 if the window was
            created and a -1 when the window is closes.
            Session windows are a bit special, because you also need
            to handle the case of merging windows, i.e., two opened
            windows can be merged and only one (the merged) window is
            closed. So would need to emit a -2 if a merged window was
            closes (assuming only two windows were merged).
            Q1)
            How to fire when a new window is created and when the
            window is closed?
            AFAIK, we can return TriggerResult only through the three
            functions: onElement, onEventTime, and onProcessingTime.
            Q2)
            Firing is to emit elements in windows down to the window
            function, not emitting values like +1, -1 and -2 which are
            not in windows.
            Or do I miss something that you meant?
            In order to do that, you'd need to carry the merging
            information forward. The Trigger.onMerge method cannot
            trigger the window function, but it could store the
            merging information in state that is later accessed.
            Q3)
            I didn't understand what you mean at all. What do you mean
            by carrying the merging information?

            Besides your suggestion, I implemented a custom trigger
            which is almost the same as EventTimeTrigger except the
            followings:
            - it maintains a variable to count sessions in an instance
            of a window operator
            - it increases the variable by 1 when onElement is invoked
            - it decreases the variable by 1 when onClose is invoked
            Considering the logic of Flink’s session window, it
            correctly counts sessions in an instance of a window
            operator.

            As you might have already noticed, this approach has a
            critical problem: there's no way to maintain an operator
            state inside a trigger.
            TriggerContext only allows to interact with state that is
            scoped to the window and the key of the current trigger
            invocation (as shown in Trigger#TriggerContext)

            Now I've come to a conclusion that it might not be
            possible using DataStream API.
            Otherwise, do I need to think in a totally different way
            to achieve the goal?

            Best,

            - Dongwon



            2018. 2. 20. 오후 6:53, Fabian Hueske <fhue...@gmail.com
            <mailto:fhue...@gmail.com>> 작성:

            Hi Dongwon Kim,

            That's an interesting question.

            I don't have a solution blueprint for you, but a few
            ideas that should help to solve the problem.

            I would start with a separate job first and later try to
            integrate it with the other job.
            You could implement a Trigger that fires when a new
            window is created and when the window is closed. A
            ProcessWindowFunction would emit a +1 if the window was
            created and a -1 when the window is closes.
            Session windows are a bit special, because you also need
            to handle the case of merging windows, i.e., two opened
            windows can be merged and only one (the merged) window is
            closed. So would need to emit a -2 if a merged window was
            closes (assuming only two windows were merged).
            In order to do that, you'd need to carry the merging
            information forward. The Trigger.onMerge method cannot
            trigger the window function, but it could store the
            merging information in state that is later accessed.

            Hope this helps,
            Fabian

            2018-02-20 9:54 GMT+01:00 Dongwon Kim
            <eastcirc...@gmail.com <mailto:eastcirc...@gmail.com>>:

                Hi,

                It could be a totally stupid question but I currently
                have no idea how to get the number of active session
                windows from a running job.

                Our traffic trajectory application (which handles up
                to 10,000 tps) uses event-time session window on
                KeyedStream (keyed by userID).

                Should I write another Flink job for the purpose?

                Cheers,

                Dongwon Kim







Reply via email to