Flink 版本:1.13.5
函数完整代码如下:
```
public class Top2RetractTableAggregateFunction extends
TableAggregateFunction<Tuple2<Long, Integer>,
Top2RetractTableAggregateFunction.Top2RetractAccumulator> {
private static final Logger LOG =
LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);
// Top2 聚合中间结果数据结构
public static class Top2RetractAccumulator {
public long beforeFirst = 0;
public long beforeSecond = 0;
public long afterFirst = 0;
public long afterSecond = 0;
}
// 创建 Top2Accumulator 累加器并做初始化
@Override
public Top2RetractAccumulator createAccumulator() {
LOG.info("[INFO] createAccumulator ...........................");
Top2RetractAccumulator acc = new Top2RetractAccumulator();
acc.beforeFirst = Integer.MIN_VALUE;
acc.beforeSecond = Integer.MIN_VALUE;
acc.afterFirst = Integer.MIN_VALUE;
acc.afterSecond = Integer.MIN_VALUE;
return acc;
}
// 接收输入元素并累加到 Accumulator 数据结构
public void accumulate(Top2RetractAccumulator acc, Long value) {
LOG.info("[INFO] accumulate ...........................");
if (value > acc.afterFirst) {
acc.afterSecond = acc.afterFirst;
acc.afterFirst = value;
} else if (value > acc.afterSecond) {
acc.afterSecond = value;
}
}
// 带撤回的输出
public void emitUpdateWithRetract(Top2RetractAccumulator acc,
RetractableCollector<Tuple2<Long, Integer>> out) {
LOG.info("[INFO] emitUpdateWithRetract ...........................");
if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {
// 撤回旧记录
if (acc.beforeFirst != Integer.MIN_VALUE) {
out.retract(Tuple2.of(acc.beforeFirst, 1));
}
// 输出新记录
out.collect(Tuple2.of(acc.afterFirst, 1));
acc.beforeFirst = acc.afterFirst;
}
if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {
// 撤回旧记录
if (acc.beforeSecond != Integer.MIN_VALUE) {
out.retract(Tuple2.of(acc.beforeSecond, 2));
}
// 输出新记录
out.collect(Tuple2.of(acc.afterSecond, 2));
acc.beforeSecond = acc.afterSecond;
}
}
}
```
调用完整代码如下:
```
// 执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useOldPlanner() // Blink Planner 异常 Old Planner 可以
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
DataStream<Row> sourceStream = env.fromElements(
Row.of("李雷", "语文", 78),
Row.of("韩梅梅", "语文", 50),
Row.of("李雷", "语文", 99),
Row.of("韩梅梅", "语文", 80),
Row.of("李雷", "英语", 90),
Row.of("韩梅梅", "英语", 40),
Row.of("李雷", "英语", 98),
Row.of("韩梅梅", "英语", 88)
);
// 注册虚拟表
tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"),
$("score"));
// 注册临时i系统函数
tEnv.createTemporarySystemFunction("Top2", new
Top2RetractTableAggregateFunction());
// 调用函数
tEnv.from("stu_score")
.groupBy($("course"))
.flatAggregate(call("Top2", $("score")).as("score", "rank"))
.select($("course"), $("score"), $("rank"))
.execute()
.print();
```
在 2022-05-23 18:21:42,"sjf0115" <[email protected]> 写道:
>函数代码如下:<br/>```<br/>public class Top2RetractTableAggregateFunction extends
>TableAggregateFunction<Tuple2<Long, Integer>,
>Top2RetractTableAggregateFunction.Top2RetractAccumulator> {<br/> private
>static final Logger LOG =
>LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);<br/> //
>Top2 聚合中间结果数据结构<br/> public static class Top2RetractAccumulator {<br/>
> public long beforeFirst = 0;<br/> public long beforeSecond = 0;<br/>
> public long afterFirst = 0;<br/> public long afterSecond =
>0;<br/> }<br/><br/> // 创建 Top2Accumulator 累加器并做初始化<br/>
>@Override<br/> public Top2RetractAccumulator createAccumulator() {<br/>
> LOG.info("[INFO] createAccumulator ...........................");<br/>
> Top2RetractAccumulator acc = new Top2RetractAccumulator();<br/>
>acc.beforeFirst = Integer.MIN_VALUE;<br/> acc.beforeSecond =
>Integer.MIN_VALUE;<br/> acc.afterFirst = Integer.MIN_VALUE;<br/>
>acc.afterSecond = Integer.MIN_VALUE;<br/> return acc;<br/>
>}<br/><br/> // 接收输入元素并累加到 Accumulator 数据结构<br/> public void
>accumulate(Top2RetractAccumulator acc, Long value) {<br/>
>LOG.info("[INFO] accumulate ...........................");<br/> if
>(value > acc.afterFirst) {<br/> acc.afterSecond =
>acc.afterFirst;<br/> acc.afterFirst = value;<br/> } else if
>(value > acc.afterSecond) {<br/> acc.afterSecond = value;<br/>
> }<br/> }<br/><br/> // 带撤回的输出<br/> public void
>emitUpdateWithRetract(Top2RetractAccumulator acc,
>RetractableCollector<Tuple2<Long, Integer>> out) {<br/>
>LOG.info("[INFO] emitUpdateWithRetract ...........................");<br/>
> if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {<br/> //
>撤回旧记录<br/> if (acc.beforeFirst != Integer.MIN_VALUE) {<br/>
> out.retract(Tuple2.of(acc.beforeFirst, 1));<br/> }<br/>
> // 输出新记录<br/> out.collect(Tuple2.of(acc.afterFirst, 1));<br/>
> acc.beforeFirst = acc.afterFirst;<br/> }<br/> if
>(!Objects.equals(acc.afterSecond, acc.beforeSecond)) {<br/> //
>撤回旧记录<br/> if (acc.beforeSecond != Integer.MIN_VALUE) {<br/>
> out.retract(Tuple2.of(acc.beforeSecond, 2));<br/> }<br/>
> // 输出新记录<br/> out.collect(Tuple2.of(acc.afterSecond,
>2));<br/> acc.beforeSecond = acc.afterSecond;<br/> }<br/>
>}<br/>}<br/>```<br/>完整调用代码:<br/>```<br/>// 执行环境<br/>StreamExecutionEnvironment
>env =
>StreamExecutionEnvironment.getExecutionEnvironment();<br/>env.setParallelism(1);<br/>EnvironmentSettings
> settings = EnvironmentSettings<br/> .newInstance()<br/>
>.useOldPlanner() // Blink Planner 异常 Old Planner 可以<br/>
>.inStreamingMode()<br/> .build();<br/>StreamTableEnvironment tEnv =
>StreamTableEnvironment.create(env, settings);<br/><br/>DataStream<Row>
>sourceStream = env.fromElements(<br/> Row.of("李雷", "语文", 78),<br/>
> Row.of("韩梅梅", "语文", 50),<br/> Row.of("李雷", "语文", 99),<br/>
>Row.of("韩梅梅", "语文", 80),<br/> Row.of("李雷", "英语", 90),<br/>
>Row.of("韩梅梅", "英语", 40),<br/> Row.of("李雷", "英语", 98),<br/>
>Row.of("韩梅梅", "英语", 88)<br/>);<br/><br/>//
>注册虚拟表<br/>tEnv.createTemporaryView("stu_score", sourceStream, $("name"),
>$("course"), $("score"));<br/>//
>注册临时i系统函数<br/>tEnv.createTemporarySystemFunction("Top2", new
>Top2RetractTableAggregateFunction());<br/>//
>调用函数<br/>tEnv.from("stu_score")<br/> .groupBy($("course"))<br/>
>.flatAggregate(call("Top2", $("score")).as("score", "rank"))<br/>
>.select($("course"), $("score"), $("rank"))<br/> .execute()<br/>
>.print();<br/>```<br/>Flink 版本:1.13.5
>在 2022-05-23 09:55:40,"Xuyang" <[email protected]> 写道:
>>Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。
>>
>>
>>
>>
>>--
>>
>> Best!
>> Xuyang
>>
>>
>>
>>
>>
>>在 2022-05-22 22:35:46,"赢峰" <[email protected]> 写道:
>>>
>>>
>>>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract
>>>输出数据。在调用的时候参考文档的使用方式:
>>>```
>>>tEnv.from("stu_score")
>>> .groupBy($("course"))
>>> .flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
>>> .select($("course"), $("f0"), $("f1"))
>>>```
>>>使用默认 blink Planner,会抛出如下异常:
>>>```
>>>Exception in thread "main" org.apache.flink.table.api.ValidationException:
>>>Could not find an implementation method 'emitValue' in class
>>>'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction'
>>>for function 'Top2' that matches the following signature:
>>>void
>>>emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator,
>>> org.apache.flink.util.Collector)
>>>```
>>>但是使用 Old Planner,则会正常输出:
>>>```
>>>StreamExecutionEnvironment env =
>>>StreamExecutionEnvironment.getExecutionEnvironment();
>>>env.setParallelism(1);
>>>EnvironmentSettings settings = EnvironmentSettings
>>> .newInstance()
>>> .useOldPlanner()
>>> .inStreamingMode()
>>> .build();
>>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>>```
>>>这是什么地方使用有问题?
>>>
>>>
>>>
>>>
>>>