Hi Jan,
Another solution is to insert Heartbeat-events at the source for
each sensor. The solution is very similar to how to advance
watermarks when there are no elements in the respective source
partition.
However, it's only easy to implement if you have your own source
and know all sensors on application start. It might also be
possible to implement if you use a new Source interface.
On Tue, Feb 9, 2021 at 7:20 AM Yun Gao <yungao...@aliyun.com
<mailto:yungao...@aliyun.com>> wrote:
Hi,
I also think there should be different ways to achieve the
target. For the first option listed previously,
the pseudo-code roughly like
class MyFunciton extends KeyedProcessFunction {
ValueState<Integer> count;
void open() {
count = ... // Create the value state
}
void processElement(T t, Context context, Collector
collector) {
Integer current = count.get();
if (current == null) {
context.timeService().registerTimer(30); // Register timer
for the first time
current = 0;
}
count.update(current + 1); // update the count
}
void onTimer(...) {
collector.collect(new Tuple2<>(getCurrentKey(),
count.get());
context.timeService().registerTimer(30); // register the
following timer
}
}
1. For flink the state and timer are all bound to a key
implicitly, thus I think they should
not need to be bound manually.
2. To clear the outdated state, it could be cleared via
count.clear(); if it has been 0
for a long time. There are different ways to count the
interval, like register another timer
and clear the timer when received the elements or update the
counter to -1, -2... to mark
how much timer it has passed.
Best,
Yun
------------------Original Mail ------------------
*Sender:*Khachatryan Roman <khachatryan.ro...@gmail.com
<mailto:khachatryan.ro...@gmail.com>>
*Send Date:*Tue Feb 9 02:35:20 2021
*Recipients:*Jan Brusch <jan.bru...@neuland-bfi.de
<mailto:jan.bru...@neuland-bfi.de>>
*CC:*Yun Gao <yungao...@aliyun.com
<mailto:yungao...@aliyun.com>>, user
<user@flink.apache.org <mailto:user@flink.apache.org>>
*Subject:*Re: Sliding Window Count: Tricky Edge Case /
Count Zero Problem
Hi,
Probably another solution would be to register a
timer (using KeyedProcessFunction) once we see an
element after keyBy. The timer will fire in
windowIntervalMs. Upon firing, it will emit a dummy
element which will be ignored (or subtracted) in the end.
Upon receiving each new element, the function will
shift the timer accordingly.
Regards,
Roman
On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch
<jan.bru...@neuland-bfi.de
<mailto:jan.bru...@neuland-bfi.de>> wrote:
Hi Yun,
thanks for your reply.
I do agree with your point about standard windows
being for high level operations and the
lower-level apis offering a rich toolset for most
advanced use cases.
I have tried to solve my problem with
keyedProcessFunctions also but was not able to
get it to work for two reasons:
1) I was not able to set up a combination of
ValueState, Timers and Triggers that emulated a
sliding window with a rising and falling count
(including 0) good enough.
2) Memory Leak: States / Windows should be
cleared after a certain time of being at count 0
in order to prevent an infinitely rising of
ValueStates (that are not needed anymore)
Can you maybe please elaborate in pseudocode how
you would envision your solution?
Best regards
Jan
On 08.02.21 05:31, Yun Gao wrote:
Hi Jan,
From my view, I think in Flink Window should
be as a "high-level" operation for some kind
of aggregation operation and if it could not
satisfy the requirements, we could at least
turn to
using the "low-level" api by using
KeyedProcessFunction[1].
In this case, we could use a ValueState to
store the current value for each key, and
increment
the value on each element. Then we could also
register time for each key on receiving the
first
element for this key, and in the onTimer
callback, we could send the current state
value, update
the value to 0 and register another timer for
this key after 30s.
Best,
Yun
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction>
------------------Original Mail
------------------
*Sender:*Jan Brusch
<jan.bru...@neuland-bfi.de>
<mailto:jan.bru...@neuland-bfi.de>
*Send Date:*Sat Feb 6 23:44:00 2021
*Recipients:*user <user@flink.apache.org>
<mailto:user@flink.apache.org>
*Subject:*Sliding Window Count: Tricky
Edge Case / Count Zero Problem
Hi,
I was recently working on a problem where we
wanted to implement a
simple count on a sliding window, e.g. "how
many messages of a certain
type were emitted by a certain type of sensor in the
last n minutes".
Which sounds simple enough in theory:
messageStream
.keyBy(//EmitterType + MessageType)
.assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
Time.seconds(30)))
.map(_ => 1)
.reduce((x,y) => x + y)
.addSink(...)
But there is a tricky edge case: The downstream
systems will never know
when the count for a certain key goes back to
0, which is important for
our use case. The technical reason being that
flink doesn't open a
window if there are no entries, i.e. a window
with count 0 doesn't exist
in flink.
We came up with the following solution for the
time being:
messageStream
.keyBy(//EmitterType + MessageType)
.window(GlobalWindows.create())
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
.evictor(// CustomEvictor: Evict all
messages older than n minutes
BEFORE processing the window)
.process(// CustomCounter: Count all
Messages in Window State);
.addSink(...)
In the case of zero messages in the last n
minutes, all messages will be
evicted from the window and the
process-function will get triggered one
last time on the now empty window, so we can
produce a count of 0.
I have two problems, though, with this solution:
1) It is computationally inefficient for a
simple count, as custom
process functions will always keep all messages
in state. And, on every
trigger all elements will have to be touched
twice: To compare the
timestamp and to count.
2) It does seem like a very roundabout solution
to a simple problem.
So, I was wondering if there was a more efficient or
"flink-like"
approach to this. Sorry for the long writeup,
but I would love to hear
your takes.
Best regards
Jan
--
neuland – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen
Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de
<https://www.neuland-bfi.de>
https://twitter.com/neuland
<https://twitter.com/neuland>
https://facebook.com/neulandbfi
<https://facebook.com/neulandbfi>
https://xing.com/company/neulandbfi
<https://xing.com/company/neulandbfi>
Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395
HB
USt-ID. DE 246585501
-- neuland – Büro für Informatik GmbHKonsul-Smidt-Str. 8g, 28217 BremenTelefon
(0421) 380107 57Fax (0421) 380107 99https://www.neuland-bfi.de
<https://www.neuland-bfi.de>https://twitter.com/neuland
<https://twitter.com/neuland>https://facebook.com/neulandbfi
<https://facebook.com/neulandbfi>https://xing.com/company/neulandbfi
<https://xing.com/company/neulandbfi>Geschäftsführer: Thomas Gebauer, Jan ZanderRegistergericht:
Amtsgericht Bremen, HRB 23395 HBUSt-ID. DE 246585501