[ https://issues.apache.org/jira/browse/FLINK-12215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu reassigned FLINK-12215: ------------------------------- Assignee: Jark Wu > Introduce SqlProcessFunction for blink streaming runtime > -------------------------------------------------------- > > Key: FLINK-12215 > URL: https://issues.apache.org/jira/browse/FLINK-12215 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime > Reporter: Jark Wu > Assignee: Jark Wu > Priority: Major > > Currently, we are heavily using DataStream's {{KeyedProcessFunction}} when > implementing Blink SQL runtime. But there are some disadvantages that lead us > to introduce a SQL own ProcessFunction. > 1. {{KeyedProcessFunction}} do not support {{endInput(Collector)}}. > This is needed to achieve a same semantic for batch and stream. For example: > {{SELECT COUNT(\*) FROM T}} should return {{0}} when input is empty, but now > there is no output result. That's why we need the {{endInput(Collector)}} to > emit a final result. I know this is not a real world streaming use case, but > is worth to do. > 2. {{KeyedProcessFunction}} is an abstract class. > As discussed in FLINK-11409, if it is an interface it will be easy to extract > some common logic to a base class and share it between ProcessFunction and > CoProcessFunction and other functions. But it doesn't work when it is an > abstract class. We also encountered this problem when we want to reuse some > code. However, it's hard to make {{KeyedProcessFunction}} as an interface > because of compatibility. > 3. {{KeyedProcessFunction}} doesn't expose {{setCurrentKey}}. > We have some optimization about lazy state writing, i.e. buffer the changes > in heap and flush them to state when doing snapshot. That needs to change > current key of the operator/function. > That's why we want to introduce a SQL own {{ProcessFunction}} interface. > Maybe we can call it {{SqlProcessFunction}}. The name can be discussed in the > JIRA. > The initial idea of {{SqlPrcessFunction}}: > {code:java} > public interface SqlProcessFunction<K, I, O> extends Function { > void processElement(I value, Context<K> ctx, Collector<O> out) throws > Exception; > void onTimer(long timestamp, OnTimerContext<K> ctx, Collector<O> out) > throws Exception; > > void endInput(Collector<O> out) throws Exception; > interface Context<K> { > TimerService timerService(); > K getCurrentKey(); > > void setCurrentKey(K key); > } > interface OnTimerContext<K> extends Context<K> { > TimeDomain timeDomain(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)