Hi Shilpa,

There is no need to have artificial messages in the input kafka topic (and I 
don’t see where Andrew suggests this 😊 )

However your use case is not 100% clear as to for which keys you want to emit 
0-count window results , either:

  *   A) For all keys your job has ever seen (that’s easy), or
  *   B) For all keys you job has seen, but you stop sending 0-count windows 
after the first one is emitted, and only start with the key when there is a new 
input event on the key, or
  *   C) For all keys from a pre-selection of keys

A KeyedProcessFunction is the way to go
I’ll sketch a solution for scenario A) the others are similar (scala-ish):

class Manual0Windowing extends KeyedProcessFunction[…] {

def open(…) = {
            //register state primitive for aggregated window state with default 
0-window-state
            val state = …
}

def processEvent(event, …) = {
            val windowEnd = getWindowEndTime(event)
            ctx.registerEventTimeTimer(windowEnd)
            var currentState = state.get //or default)
            currentState = aggregate(currentState, event)
            state.put(currentState)
}

def onTimer(timestamp, ctx, out) = {
            val currentState = state.get

            if(is0Window(currentState)) {
                        //for scenario B) drop next line
                        ctx….registerEventTimer(timestamp + tumblingWIndowTime)

            } else {
                        ctx….registerEventTimer(timestamp + tumblingWIndowTime)
}
out.collect(currentState)
state.clear
}

}

… Just to give an idea
… this code does not take care of late events (need too use a MapState instead 
keyed by windowEndTime)

What do you think…?

Thias



From: Shilpa Shankar <sshan...@bandwidth.com>
Sent: Monday, May 9, 2022 4:00 PM
To: Dario Heinisch <dario.heini...@gmail.com>; Andrew Otto <o...@wikimedia.org>
Cc: user@flink.apache.org
Subject: Re: Notify on 0 events in a Tumbling Event Time Window

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Thanks Andrew.
We did consider this solution too. Unfortunately we do not have permissions to 
generate artificial kafka events in our ecosystem.

Dario,
Thanks for your inputs. We will give your design a try. Due the number of 
events being processed per window, we are using incremental aggregate function 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#processwindowfunction-with-incremental-aggregation.
 Do you think we can use KeyedCoProcessFunction in this design?

Thanks,
Shilpa







On Mon, May 9, 2022 at 9:31 AM Dario Heinisch 
<dario.heini...@gmail.com<mailto:dario.heini...@gmail.com>> wrote:

It depends on the user case,  in Shilpa's use case it is about users so the 
user ids are probably know beforehand.

https://dpaste.org/cRe3G <= This is an example with out an window but 
essentially Shilpa you would be reregistering the timers every time they fire.
You would also have to ingest the user ids before hand into your pipeline, so 
that if a user never has any event he still gets a notification. So probably on 
startup ingest the user ids with a single source
from the DB.

My example is pretty minimal but the idea in your case stays the same:

- key by user
- have a co-process function to init the state with the user ids
- reregister the timers every time they fire
- use `env.getConfig().setAutoWatermarkInterval(1000)` to move the event time 
forward even if there is no data coming in (this is what you are probably 
looking for!!)
- then collect an Optionable/CustomStruct/Null or so depending on if data is 
present or not
- and then u can check whether the event was triggered because there was data 
or because there wasn't data

Best regards,

Dario
On 09.05.22 15:19, Andrew Otto wrote:
This sounds similar to a non streaming problem we had at WMF.  We ingest all 
event data from Kafka into HDFS/Hive and partition the Hive tables in hourly 
directories.  If there are no events in a Kafka topic for a given hour, we have 
no way of knowing if the hour has been ingested successfully.  For all we know, 
the upstream producer pipeline might be broken.

We solved this by emitting artificial 'canary' events into each topic multiple 
times an hour.  The canary events producer uses the same code pathways and 
services that (most) of our normal event producers do.  Then, when ingesting 
into Hive, we filter out the canary events.  The ingestion code has work to do 
and can mark an hour as complete, but still end up writing no events to it.

Perhaps you could do the same?  Always emit artificial events, and filter them 
out in your windowing code? The window should still fire since it will always 
have events, even if you don't use them?




On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar 
<sshan...@bandwidth.com<mailto:sshan...@bandwidth.com>> wrote:
Hello,
We are building a flink use case where we are consuming from a kafka topic and 
performing aggregations and generating alerts based on average, max, min 
thresholds. We also need to notify the users when there are 0 events in a 
Tumbling Event Time Windows. We are having trouble coming up with a solution to 
do the same. The options we considered are below, please let us know if there 
are other ideas we haven't looked into.

[1] Querable State : Save the keys in each of the Process Window Functions. 
Query the state from an external application and alert when a key is missing 
after the 20min time interval has expired. We see Queryable state feature is 
being deprecated in the future. We do not want to go down this path when we 
already know there is an EOL for it.

[2] Use Processing Time Windows :  Using Processing time instead of Event time 
would have been an option if our downstream applications would send out events 
in real time. Maintenances of the downstream applications, delays etc would 
result in a lot of data loss which is undesirable.

Flink version : 1.14.3

Thanks,
Shilpa
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to