Hi Larry,
the basic problem for your use case is that window boundaries are
inclusive for the start timestamp and exclusive for the end timestamp.
It's setup like this to ensure that consecutive tumbling windows don't
overlap. This is only a function of how our `WindowAssigner` works, so
it could be done differently in a different system.
Have you tried using a sliding window where the `slide` is `size - 1ms`?
With this, you would ensure that elements that fall exactly on the
boundary, i.e. your hourly sensor updates would end up in both of the
consecutive windows. It seems a bit unorthodox but could work in your
case.
Best,
Aljoscha
On 2021/01/08 08:56, Larry Aspen wrote:
Hi,
I'm evaluating Flink for our company's IoT use case and read a blog post
by Fabian Hueske from 2015 [1]. We have a similar situation except the
sensor is sending the value of a cumulative counter instead of a count.
We would like to calculate the sum of deltas of consecutive cumulative
counter values that occur during a time window.
Here is a scenario of a cumulative counter measuring runtime in seconds
and a machine starting for the first time at 12:00:00 and running for
the whole hour (sensor records values when it starts, every 15 minutes
and on hour change):
timestamp, cumulative counter value in seconds
12:00:00, 0
12:15:00, 900
12:30:00, 1800
12:45:00, 2700
13:00:00, 3600
This would produce the following deltas:
12:00:00, 900 -0 = 900
12:15:00, 1800 - 900 = 900
12:30:00, 2700 - 1800 = 900
12:45:00, 3600 - 2700 = 900
We would then sum the deltas to get runtime in seconds for the hour:
900 + 900 + 900 + 900 = 3600
What would be a good way to handle this kind of calculation in Flink?
I have already tried using a tumbling event time window of one hour,
but then the last value is only part of the next window and the delta
of 12:45:00 is missing and the sum is 900 + 900 + 900 = 2700.
I have also tried a sliding event time window of two hours where the sum
is calculated for the first hour. This produces the correct sum in this
scenario but fails if the next hour is later (e.g. 12:45:00, 14:00:00
i.e. machine is shutdown between 12:45:00 - 13:00:00 and started at
14:00:00).
My latest attempt has been to use a global window where I try to keep
the values for the last two hours and calculate the sum for the older
hour. This seems to work in my experiments where I read values from
a file and use parallelism of one. If I increase the parallelism, the
values are processed out of order and the results are incorrect as
older values are received after newer values which causes them to be
evicted.
Any advice on this would be appreciated.
Best regards,
Larry Aspen
[1] https://flink.apache.org/news/2015/12/04/Introducing-windows.html