自定义的trigger,实现满足maxcount或者到达窗口结束时间时输出结果;
问题:同一个窗口,在代码窗口结束时onProcessingTime会触发多次,理论上每个爽口只应该在到达窗口结束时间触发一次,是什么原因
主类代码片段:
SingleOutputStreamOperator<List<String>> windowMap =
afterMap.timeWindowAll(Time.seconds(5))
.trigger(new CountAndProcessingTimeTrigger(
100))
.process(simpleConfig.getWindowFunction().newInstance())
触发器代码:
public class CountAndProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
//窗口最大个数
private final long maxCount;
private final ReducingStateDescriptor<Long> stateDesc;
public CountAndProcessingTimeTrigger(long maxCount) {
this.stateDesc = new ReducingStateDescriptor<>("count_time", new
CountAndProcessingTimeTrigger.Sum(),
LongSerializer.INSTANCE);
this.maxCount = maxCount;
}
/**
* 元素添加
*
* @param o 元素
* @param timestamp timestamp
* @param window window
* @param triggerContext triggerContext
* @return TriggerResult
* CONTINUE:表示啥都不做。
* FIRE:表示触发计算,同时保留窗口中的数据
* PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。
* FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。
* @throws Exception Exception
*/
@Override
public TriggerResult onElement(Object o, long timestamp, TimeWindow window,
TriggerContext triggerContext)
throws Exception {
triggerContext.registerProcessingTimeTimer(window.maxTimestamp());
ReducingState<Long> countState =
triggerContext.getPartitionedState(stateDesc);
countState.add(1L);
if (countState.get() >= maxCount) {
log.info("countTrigger: {}", countState.get());
countState.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
/**
* 窗口关闭
*
* @param timestamp timestamp
* @param window window
* @param triggerContext triggerContext
* @return TriggerResult
* @throws Exception Exception
*/
@Override
public TriggerResult onProcessingTime(long timestamp, TimeWindow window,
TriggerContext triggerContext)
throws Exception {
ReducingState<Long> countState =
triggerContext.getPartitionedState(stateDesc);
log.info("timeTrigger: {}, currentProcessingTime:{}", countState.get(),
window.maxTimestamp());
countState.clear();
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long timestamp, TimeWindow window,
TriggerContext triggerContext)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public boolean canMerge() {
return false;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
ctx.mergePartitionedState(stateDesc);
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
/**
* 窗口删除
*
* @param window window
* @param triggerContext triggerContext
* @throws Exception Exception
*/
@Override
public void clear(TimeWindow window, TriggerContext triggerContext) throws
Exception {
triggerContext.deleteProcessingTimeTimer(window.maxTimestamp());
triggerContext.getPartitionedState(stateDesc).clear();
}
/**
* 计数方法
*/
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
private Sum() {
}
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
| |
吴先生
|
|
[email protected]
|