Scratch that - your WatermarkStrategy DOES work (when I implement
it correctly!).
Well, almost: As you can see below (code pushed to repo), the
Timer events are still appearing somewhat late in the stream - 4
events late in this case. It may be just good-enough for my
purposes, though it will make building test cases painful, so if
you have any ideas how I could fix that, would be much appreciated.
{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts
0 msg_in.ts 1000 Cancelled previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts
0 msg_in.ts 1000 Cancelled previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts
1000 msg_in.ts 2000 Cancelled previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts
1000 msg_in.ts 2000 Cancelled previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts
2000 msg_in.ts 3000 Cancelled previous timer. "}
{"ts":1000,"id":"2","is_online":false,"log":"timestamp is 1000"}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts
2000 msg_in.ts 3000 Cancelled previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts
3000 msg_in.ts 4000 Cancelled previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts
3000 msg_in.ts 4000 Cancelled previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts
4000 msg_in.ts 5000 Cancelled previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts
0 msg_in.ts 5000 Cancelled previous timer. "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts
5000 msg_in.ts 6000 Cancelled previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts
5000 msg_in.ts 6000 Cancelled previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts
6000 msg_in.ts 7000 Cancelled previous timer. "}
{"ts":5000,"id":"1","is_online":false,"log":"timestamp is 5000"}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts
4000 msg_in.ts 7000 Cancelled previous timer. "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts
6000 msg_in.ts 7000 Cancelled previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts
7000 msg_in.ts 8000 Cancelled previous timer. "}
{"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts
7000 msg_in.ts 8000 Cancelled previous timer. "}
{"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts
7000 msg_in.ts 8000 Cancelled previous timer. "}
{"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts
8000 msg_in.ts 9000 Cancelled previous timer. "}
{"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts
8000 msg_in.ts 9000 Cancelled previous timer. "}
{"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts
8000 msg_in.ts 9000 Cancelled previous timer. "}
{"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts
9000 msg_in.ts 10000 Cancelled previous timer. "}
{"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts
9000 msg_in.ts 10000 Cancelled previous timer. "}
{"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts
9000 msg_in.ts 10000 Cancelled previous timer. "}
{"ts":11000,"id":"1","is_online":false,"log":"timestamp is 11000"}
{"ts":11000,"id":"2","is_online":false,"log":"timestamp is 11000"}
{"ts":11000,"id":"0","is_online":false,"log":"timestamp is 11000"}
-Pilgrim
--
Learn more at https://devicepilot.com <https://devicepilot.com>
@devicepilot
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33>
+44 7961 125282
See our latest features
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33>
and book me
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=adc3545f-9610-4164-fa4a-2bddbd615e33>
for
a video call.
On Thu, 28 Jan 2021 at 08:37, Pilgrim Beart
<pilgrim.be...@devicepilot.com
<mailto:pilgrim.be...@devicepilot.com>> wrote:
Chesnay,
I cannot reproduce this - I've tried the approaches you
suggest, but nothing I've done makes the timers fire at the
correct time in the stream - they only fire when the stream
has ended. If you have an EventTime example where they fire
at the right time in the stream, I'd love to see it. Or any
ideas for other things to try? Could it perhaps be related to
using a file source?
Thanks,
-Pilgrim
--
Learn more at https://devicepilot.com
<https://devicepilot.com> @devicepilot
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
+44 7961 125282
See our latest features
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
and book me
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=a563531c-b630-4c52-feeb-e32ba4302f87>
for
a video call.
On Wed, 27 Jan 2021 at 17:55, Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:
Actually, if the parallelism is 1 then it works as it
should. sigh....
On 1/27/2021 6:52 PM, Chesnay Schepler wrote:
Note that while this does fix the issue of timers not
firing while the job is running, it seems to be firing
too many timers...
On 1/27/2021 6:49 PM, Chesnay Schepler wrote:
My bad, I was still using the custom WatermarkStrategy
that emits a watermark for each event.
.assignTimestampsAndWatermarks( new
WatermarkStrategy<T>() { @Override public
WatermarkGenerator<T>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context
context) { return new
BoundedOutOfOrdernessWatermarks<T>(Duration.ofSeconds(1))
{ @Override public void onEvent(Tevent, long
eventTimestamp, WatermarkOutput output) {
super.onEvent(event, eventTimestamp, output);
super.onPeriodicEmit(output); } }; } } .withTimestampAssigner(...)
@Aljoscha This is about Flink 1.11. Since the periodic
watermarks are dependent on processing time, am I
assuming correctly if the job finishes quickly that
watermarks may never be emitted (except for those at
the job)? Is there any way to emit periodic watermarks
based on event time?
Is there any way to enable punctuated watermarks for
the existing watermark strategies without having to
implement a custom one?
On 1/27/2021 5:57 PM, Pilgrim Beart wrote:
Chesnay,
Thanks for this - I've made the change you suggested
(setAutoWatermarkInterval) but it hasn't changed the
behaviour - timers still get processed only on stream end.
I have pushed a new version, with this change, and
also emitting some information in a .log field.
If you search for "!!!" in Ingest.java
and DPTimeoutFunction.java you'll see the relevant
changes.
In DPTimeoutFunction you'll see that if I add code to
say "cancel the timer only if it wouldn't have gone
off" then the output is now correct - individual
devices do timeout. However, this output only appears
at the end of the stream (i.e. time jumps backwards as
all the timers are processed) so I still appear not to
be seeing timer processing at the correct event time.
If there was no end of stream, I would never get any
timeouts.
Below is the output I get when I run. This output is
correct but:
a) only because I am manually cancelling timers in
DPTimeoutFunction (search for "!!!")
b) the timer events are timestamped correctly, but are
not emitted into the stream at the right time - and if
the stream didn't end then no timeouts would ever
occur (which in particular means that devices that
never come back online will never get marked as offline).
Perhaps I do need to implement an onPeriodicEmit
function? Does that require a customer watermark
strategy? I can see how to define a custom watermark
at link below, but unclear how to install that?
https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy
<https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy>
{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new
state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new
state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new
state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts
0 msg_in.ts 1000 Cancelling previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts
0 msg_in.ts 1000 Cancelling previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts
4000 msg_in.ts 5000 Cancelling previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts
0 msg_in.ts 5000 "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts
4000 msg_in.ts 7000 "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts
7000 msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts
7000 msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts
7000 msg_in.ts 8000 Cancelling previous timer. "}
{"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts
8000 msg_in.ts 9000 Cancelling previous timer. "}
{"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts
8000 msg_in.ts 9000 Cancelling previous timer. "}
{"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts
8000 msg_in.ts 9000 Cancelling previous timer. "}
{"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts
9000 msg_in.ts 10000 Cancelling previous timer. "}
{"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts
9000 msg_in.ts 10000 Cancelling previous timer. "}
{"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts
9000 msg_in.ts 10000 Cancelling previous timer. "}
{"ts":1001,"id":"2","is_online":false} // These are
the "going offline" events that we want to see. But
they are emitted only once the stream has ended.
{"ts":5001,"id":"1","is_online":false}
{"ts":11001,"id":"1","is_online":false}
{"ts":11001,"id":"0","is_online":false}
{"ts":11001,"id":"2","is_online":false}
Thanks,
-Pilgrim
--
Learn more at https://devicepilot.com
<https://devicepilot.com> @devicepilot
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
+44 7961 125282
See our latest features
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
and book me
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
for
a video call.
On Wed, 27 Jan 2021 at 14:09, Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:
||
You were right that it is an issue with the
watermarks; outside of the when the job was
stopped they were never emitted downstream, so no
timer was ever triggered.
It appears that you need to set the
setAutoWatermarkInterval in the ExecutionConfig via
env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(500).toMillis());
to have them periodically emitted. Alternatively
you could override
BoundedOutOfOrdernessWatermarks#onEvent to also
emit a watermark for event (for example, by
calling #onPeriodicEmit).
Put another way, if you use any of the built-in
WatermarkGenerators and use event-time, then it
appears that you *must* set this interval.
This behavior is...less than ideal I must admit,
and it does not appear to be properly documented.
On 1/27/2021 1:56 PM, Chesnay Schepler wrote:
Based on your description you aren't doing
anything obviously wrong.
Would it be possible for you to share the code
with us?
On 1/27/2021 1:02 PM, Pilgrim Beart wrote:
A newbie question:
I've created a basic Flink DataStream job for an
IoT use-case, with file source and sink for testing.
I key by device ID, then in a ProcessFunction
set an EventTime Timer to fire if a device falls
silent, i.e. a timeout, which I cancel if
another message arrives from that device within
the timeout.
My test source generates 3 devices, one of which
falls silent for more than the timeout period
during the stream, then resumes again. So I
expect the Timer to fire for that device during
the stream, and then for all the Timers to fire
after the end of the stream.
The timers do indeed fire at the end of the
stream (e.g. with a timeout of 1000, the timers
all fire 1000 after the end of the stream, which
is correct). But no timer fires for the device
which falls silent during the stream (even
though other devices are still talking,
advancing event time). I've verified that I am
keying correctly by ID.
I suspect this is something to do with
Watermarks. I'm using forBoundedOutOfOrderness
watermarking with a duration of 0.
All suggestions welcome, thanks.
-Pilgrim
--
Learn more at https://devicepilot.com
<https://devicepilot.com> @devicepilot
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
+44 7961 125282
See our latest features
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
and book me
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
for
a video call.