Hi all

Currently, flink only supports limited window functions like TUMBLE, HOP, 
SESSION. They are very useful in many use cases but sometimes
cannot meet business requirements which need a more complicated window logic.
Here are some requirements which cannot be satisfied by the current window 
function and they are all keyed streams:

In ad system, we want to accumulate the show count of each ad and we need a 
fixed time window like 1 hour to aggregate the result. But we cannot start the 
ad window before some specified action of that ad happens. So each ad should 
have a different window range.

A more complicated scenario is that for each ad, we start the window after some 
specified event happens. But we need a global window to accumulative some 
metrics of that ad. Each time a new ad event metric comes in, the global window 
will be fired. Every hour, the global window will periodically be fired to 
notify the current status to downstream consumers even if there's no new ad 
events coming. The emitting result can be considered as (ad_id, 
window_start_time, current_time, metric_number)??Downstream consumer may be a 
strategy rule engine to take action according to the result. And more 
complicated, to avoid the OOM issue of global window count accumulating 
indefinitely, we can send a purge message of some special format to the stream 
to let the window function purge the global window if we do not need that ad 
anymore. 


They are very classic use cases and cannot be supported purely in SQL right 
now. It will be very helpful to enable customized window udf and it definitely 
will enrich the functionality of flink SQL very much.




Public Interfaces
We need to extend current SQL semantics to support the definition of window udf.
create [TEMPORARY|TEMPORARY SYSTEM] window function [IF NOT EXISTS] 
[catalog_name.][db_name.]function_name AS identifier (col_name col_type, ....)

TEMPORARY
Create temporary catalog function that has catalog and database namespaces and 
overrides catalog functions.
TEMPORARY SYSTEM
Create temporary system function that has no namespace and overrides built-in 
functions
IF NOT EXISTS
If the function already exists, nothing will happen.
IDENTIFIER
The full package class name of window function.
col_name col_type...
The result columns of the window emitted.


For example:
create temporary window function ad_sum_window as 
'com.examples.AdSumWindowFunction';
select 
ad_id, 
ad_sum_window_window_start_time, 
ad_sum_window_current_time, 
ad_sum_window_sum_value as convert_cnt
from event_streams
group by
ad_id, ad_sum_window('start', convert_cnt, interval '1' hour)

ad_sum_window is a customized window function which is defined in 
com.examples.AdSumWindowFunction??a detailed interface design of this new 
architect of dunction will be introduced later. This window function will wait 
event_name="start" from event_streams to begin the accumulative process of 
convert_cnt metric. Also this window will periodically trigger every 1 hour to 
report the current status. We only need to know is that we can infer the 
produced columns of ad_sum_window in class definition. For example, 
ad_sum_window will produce (window_start_time timestamp, current_time 
timestamp, sum_value bigint), then flink will automatically produce 
<udf_name&gt;_<column_name&gt; in select list. This is a little tricky but in 
FLIP-145, some select columns like window_start and window_end are also 
generated from window semantics. It would be great if you have any other good 
ideas for this.




Proposed Changes
A new calcite syntax to create window function.
A new User-defined window function design just like SourceFunction or 
ScalarFunction. Below logic can be defined in it:

Customized window type

Window assigner

Trigger

Produced result format


Thanks
Suhan

Reply via email to