1) Outside of small-scale tests the periodic emission of watermarks should ensure that the latency is not unbounded. Your test just runs so quickly that this never triggers. As for the triggering element being emitted first: The reason this happen is because watermarks are not really attached to elements. The watermark generator accepts an element, determines the watermark, outputs the element, and then outputs the watermark.
IOW, watermarks are trailing "normal" elements in the stream.
Hence the order of operations is: element arrives at your ProcessFunction, is emitted. Watermark arrives at your PF, advances time, timers are fired.

3) The OutOfOrderness parameter is a trade-off between memory usage, latency and tolerance in regards to your input source. The higher the OutOfOrderness the longer Flink has to keep Window state around (because another element could still arrive), and consequently more Windows can be active.
As such, the lower this value is the less resources Flink will consume.
You want this value to be low as possible while still being able to handle as much of the out-of-order data as required your use-case. If you can guarantee that all input elements are sorted by their timestamp then setting it to 0 is ok.

4) You can see the allowed lateness individually for each window operation.
See https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#allowed-lateness

On 1/28/2021 7:52 PM, Pilgrim Beart wrote:
Chesnay,
1) Correct, I'd like the timeout event (generated at eventTime==1000) to appear in its correct time sequence in the output, i.e. before eventTime exceeds 1000. It's great that Flink can deal with out-of-orderness, but I didn't expect it to spontaneously create it (especially with parallelism==1). In the previous case the timeout is emitted late by 2 seconds (4 events). So I was wondering - how late could it be? I dialled down the Duration of the WatermarkGenerator BoundedOutOfOrderness to 0, and the timeout now only appears /slightly /late, as log output below. By inserting extra timestamps, I've demonstrated that this is "1 event" late, rather than "1 second" late. It's as if the watermark generator realises that time is advancing, so it triggers the timeout, but only after emitting the event that advanced time?
At least this is feeling deterministic.
Although, relying on the presence of that "forcing" event seems non-ideal - if there just happens not to be one, due to a gap in other ID streams, we'll get unbounded latency in our timeouts, which means we can't offer any downstream systems any out-of-orderness guarantee.

2) Apart from that unbounded latency concern, it's a fair point that if I'm going to partition the output by ID anyway, this isn't a huge problem.

3) Is there any negative effect of setting the BoundedOutofOrderness duration to 0? Does it somehow make Flink less efficient?

4) In a subsequent stage, we want to do time-window aggregation (but only within, not across, IDs). Setting the watermark duration to 0 will make the window emit immediately. Bu we want data that arrives less than 2 minutes late not to be considered late, i.e. don't emit any window until the latest event time is at least 2 minutes after the window end time. Is it possible to set watermark strategies separately per processing stage?

Thanks again for all your very helpful responses,

{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelled previous timer. "} {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelled previous timer. "} {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelled previous timer. "} {"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"}                 // <----- Arrives one event too late {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelled previous timer. "} {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelled previous timer. "} {"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelled previous timer. "} {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000 msg_in.ts 4000 Cancelled previous timer. "}

-Pilgrim
--
Learn more at https://devicepilot.com <https://devicepilot.com> @devicepilot <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=c7e08161-9dc0-4a59-c805-6b696d22e584>  +44 7961 125282 See our latest features <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=c7e08161-9dc0-4a59-c805-6b696d22e584> and book me <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=c7e08161-9dc0-4a59-c805-6b696d22e584> for a video call.



On Thu, 28 Jan 2021 at 12:34, Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    I'm not sure I see the problem in your output.

    For any given key the timestamps are in order, and the events
    where devices are offline seem to occur at the right time.

    Is it just that you'd like the following line to occur earlier in
    the output?

    {"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"}

    If so, then I'd just partition the output by key and evaluate them
    individually.

    On 1/28/2021 9:53 AM, Pilgrim Beart wrote:
    Scratch that - your WatermarkStrategy DOES work (when I implement
    it correctly!).
    Well, almost: As you can see below (code pushed to repo), the
    Timer events are still appearing somewhat late in the stream - 4
    events late in this case. It may be just good-enough for my
    purposes, though it will make building test cases painful, so if
    you have any ideas how I could fix that, would be much appreciated.

    {"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
    {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
    {"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
    {"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts
    0 msg_in.ts 1000 Cancelled previous timer. "}
    {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts
    0 msg_in.ts 1000 Cancelled previous timer. "}
    {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts
    1000 msg_in.ts 2000 Cancelled previous timer. "}
    {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts
    1000 msg_in.ts 2000 Cancelled previous timer. "}
    {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts
    2000 msg_in.ts 3000 Cancelled previous timer. "}
    {"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"}
    {"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts
    2000 msg_in.ts 3000 Cancelled previous timer. "}
    {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts
    3000 msg_in.ts 4000 Cancelled previous timer. "}
    {"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts
    3000 msg_in.ts 4000 Cancelled previous timer. "}
    {"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts
    4000 msg_in.ts 5000 Cancelled previous timer. "}
    {"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts
    0 msg_in.ts 5000 Cancelled previous timer. "}
    {"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts
    5000 msg_in.ts 6000 Cancelled previous timer. "}
    {"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts
    5000 msg_in.ts 6000 Cancelled previous timer. "}
    {"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts
    6000 msg_in.ts 7000 Cancelled previous timer. "}
    {"ts":5000,"id":"1","is_online":false,"log":"timestamp is 5000"}
    {"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts
    4000 msg_in.ts 7000 Cancelled previous timer. "}
    {"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts
    6000 msg_in.ts 7000 Cancelled previous timer. "}
    {"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts
    7000 msg_in.ts 8000 Cancelled previous timer. "}
    {"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts
    7000 msg_in.ts 8000 Cancelled previous timer. "}
    {"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts
    7000 msg_in.ts 8000 Cancelled previous timer. "}
    {"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts
    8000 msg_in.ts 9000 Cancelled previous timer. "}
    {"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts
    8000 msg_in.ts 9000 Cancelled previous timer. "}
    {"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts
    8000 msg_in.ts 9000 Cancelled previous timer. "}
    {"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts
    9000 msg_in.ts 10000 Cancelled previous timer. "}
    {"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts
    9000 msg_in.ts 10000 Cancelled previous timer. "}
    {"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts
    9000 msg_in.ts 10000 Cancelled previous timer. "}
    {"ts":11000,"id":"1","is_online":false,"log":"timestamp is 11000"}
    {"ts":11000,"id":"2","is_online":false,"log":"timestamp is 11000"}
    {"ts":11000,"id":"0","is_online":false,"log":"timestamp is 11000"}
    -Pilgrim
    --
    Learn more at https://devicepilot.com <https://devicepilot.com>
    @devicepilot
    
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33>
     +44 7961 125282
    See our latest features
    
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33>
    and book me
    
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33>
 for
    a video call.



    On Thu, 28 Jan 2021 at 08:37, Pilgrim Beart
    <pilgrim.be...@devicepilot.com
    <mailto:pilgrim.be...@devicepilot.com>> wrote:

        Chesnay,
        I cannot reproduce this - I've tried the approaches you
        suggest, but nothing I've done makes the timers fire at the
        correct time in the stream - they only fire when the stream
        has ended. If you have an EventTime example where they fire
        at the right time in the stream, I'd love to see it. Or any
        ideas for other things to try? Could it perhaps be related to
        using a file source?

        Thanks,

        -Pilgrim
        --
        Learn more at https://devicepilot.com
        <https://devicepilot.com> @devicepilot
        
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
         +44 7961 125282
        See our latest features
        
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
        and book me
        
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
 for
        a video call.



        On Wed, 27 Jan 2021 at 17:55, Chesnay Schepler
        <ches...@apache.org <mailto:ches...@apache.org>> wrote:

            Actually, if the parallelism is 1 then it works as it
            should. sigh....

            On 1/27/2021 6:52 PM, Chesnay Schepler wrote:
            Note that while this does fix the issue of timers not
            firing while the job is running, it seems to be firing
            too many timers...

            On 1/27/2021 6:49 PM, Chesnay Schepler wrote:
            My bad, I was still using the custom WatermarkStrategy
            that emits a watermark for each event.
            .assignTimestampsAndWatermarks( new
            WatermarkStrategy<T>() { @Override public
            WatermarkGenerator<T>
            createWatermarkGenerator(WatermarkGeneratorSupplier.Context
            context) { return new
            BoundedOutOfOrdernessWatermarks<T>(Duration.ofSeconds(1))
            { @Override public void onEvent(Tevent, long
            eventTimestamp, WatermarkOutput output) {
            super.onEvent(event, eventTimestamp, output);
            super.onPeriodicEmit(output); } }; } } .withTimestampAssigner(...)

            @Aljoscha This is about Flink 1.11. Since the periodic
            watermarks are dependent on processing time, am I
            assuming correctly if the job finishes quickly that
            watermarks may never be emitted (except for those at
            the job)? Is there any way to emit periodic watermarks
            based on event time?
            Is there any way to enable punctuated watermarks for
            the existing watermark strategies without having to
            implement a custom one?

            On 1/27/2021 5:57 PM, Pilgrim Beart wrote:
            Chesnay,
            Thanks for this - I've made the change you suggested
            (setAutoWatermarkInterval) but it hasn't changed the
            behaviour - timers still get processed only on stream end.
            I have pushed a new version, with this change, and
            also emitting some information in a .log field.
            If you search for "!!!" in Ingest.java
            and DPTimeoutFunction.java you'll see the relevant
            changes.

            In DPTimeoutFunction you'll see that if I add code to
            say "cancel the timer only if it wouldn't have gone
            off" then the output is now correct - individual
            devices do timeout. However, this output only appears
            at the end of the stream (i.e. time jumps backwards as
            all the timers are processed) so I still appear not to
            be seeing timer processing at the correct event time.
            If there was no end of stream, I would never get any
            timeouts.

            Below is the output I get when I run. This output is
            correct but:
            a) only because I am manually cancelling timers in
            DPTimeoutFunction (search for "!!!")
            b) the timer events are timestamped correctly, but are
            not emitted into the stream at the right time - and if
            the stream didn't end then no timeouts would ever
            occur (which in particular means that devices that
            never come back online will never get marked as offline).

            Perhaps I do need to implement an onPeriodicEmit
            function? Does that require a customer watermark
            strategy? I can see how to define a custom watermark
            at link below, but unclear how to install that?
            
https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy
            
<https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy>

            {"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new
            state. "}
            {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new
            state. "}
            {"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new
            state. "}
            {"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts
            0 msg_in.ts 1000 Cancelling previous timer. "}
            {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts
            0 msg_in.ts 1000 Cancelling previous timer. "}
            {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts
            1000 msg_in.ts 2000 Cancelling previous timer. "}
            {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts
            1000 msg_in.ts 2000 Cancelling previous timer. "}
            {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts
            2000 msg_in.ts 3000 Cancelling previous timer. "}
            {"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts
            2000 msg_in.ts 3000 Cancelling previous timer. "}
            {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts
            3000 msg_in.ts 4000 Cancelling previous timer. "}
            {"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts
            3000 msg_in.ts 4000 Cancelling previous timer. "}
            {"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts
            4000 msg_in.ts 5000 Cancelling previous timer. "}
            {"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts
            0 msg_in.ts 5000 "}
            {"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts
            5000 msg_in.ts 6000 Cancelling previous timer. "}
            {"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts
            5000 msg_in.ts 6000 Cancelling previous timer. "}
            {"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts
            6000 msg_in.ts 7000 Cancelling previous timer. "}
            {"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts
            4000 msg_in.ts 7000 "}
            {"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts
            6000 msg_in.ts 7000 Cancelling previous timer. "}
            {"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts
            7000 msg_in.ts 8000 Cancelling previous timer. "}
            {"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts
            7000 msg_in.ts 8000 Cancelling previous timer. "}
            {"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts
            7000 msg_in.ts 8000 Cancelling previous timer. "}
            {"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts
            8000 msg_in.ts 9000 Cancelling previous timer. "}
            {"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts
            8000 msg_in.ts 9000 Cancelling previous timer. "}
            {"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts
            8000 msg_in.ts 9000 Cancelling previous timer. "}
            {"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts
            9000 msg_in.ts 10000 Cancelling previous timer. "}
            {"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts
            9000 msg_in.ts 10000 Cancelling previous timer. "}
            {"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts
            9000 msg_in.ts 10000 Cancelling previous timer. "}
            {"ts":1001,"id":"2","is_online":false} // These are
            the "going offline" events that we want to see. But
            they are emitted only once the stream has ended.
            {"ts":5001,"id":"1","is_online":false}
            {"ts":11001,"id":"1","is_online":false}
            {"ts":11001,"id":"0","is_online":false}
            {"ts":11001,"id":"2","is_online":false}

            Thanks,

            -Pilgrim
            --
            Learn more at https://devicepilot.com
            <https://devicepilot.com> @devicepilot
            
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
             +44 7961 125282
            See our latest features
            
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
            and book me
            
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
 for
            a video call.



            On Wed, 27 Jan 2021 at 14:09, Chesnay Schepler
            <ches...@apache.org <mailto:ches...@apache.org>> wrote:

                ||
                You were right that it is an issue with the
                watermarks; outside of the when the job was
                stopped they were never emitted downstream, so no
                timer was ever triggered.

                It appears that you need to set the
                setAutoWatermarkInterval in the ExecutionConfig via

                
env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(500).toMillis());


                to have them periodically emitted. Alternatively
                you could override
                BoundedOutOfOrdernessWatermarks#onEvent to also
                emit a watermark for event (for example, by
                calling #onPeriodicEmit).

                Put another way, if you use any of the built-in
                WatermarkGenerators and use event-time, then it
                appears that you *must* set this interval.

                This behavior is...less than ideal I must admit,
                and it does not appear to be properly documented.

                On 1/27/2021 1:56 PM, Chesnay Schepler wrote:

                Based on your description you aren't doing
                anything obviously wrong.

                Would it be possible for you to share the code
                with us?

                On 1/27/2021 1:02 PM, Pilgrim Beart wrote:
                A newbie question:

                I've created a basic Flink DataStream job for an
                IoT use-case, with file source and sink for testing.
                I key by device ID, then in a ProcessFunction
                set an EventTime Timer to fire if a device falls
                silent, i.e. a timeout, which I cancel if
                another message arrives from that device within
                the timeout.

                My test source generates 3 devices, one of which
                falls silent for more than the timeout period
                during the stream, then resumes again. So I
                expect the Timer to fire for that device during
                the stream, and then for all the Timers to fire
                after the end of the stream.

                The timers do indeed fire at the end of the
                stream (e.g. with a timeout of 1000, the timers
                all fire 1000 after the end of the stream, which
                is correct). But no timer fires for the device
                which falls silent during the stream (even
                though other devices are still talking,
                advancing event time). I've verified that I am
                keying correctly by ID.

                I suspect this is something to do with
                Watermarks. I'm using forBoundedOutOfOrderness
                watermarking with a duration of 0.

                All suggestions welcome, thanks.

                -Pilgrim
                --
                Learn more at https://devicepilot.com
                <https://devicepilot.com> @devicepilot
                
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
                 +44 7961 125282
                See our latest features
                
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
                and book me
                
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
 for
                a video call.








Reply via email to