Hi Maxim, I think your problem should be solvable with the CEP library:
public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Event> input = ...; Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where( new SimpleCondition<Event>() { @Override public boolean filter(Event event) { return event.isForward(); } } ).followedBy("end").where( new SimpleCondition<Event>() { @Override public boolean filter(Event event) { return event.isCancel(); } } ).within(Time.minutes(1)); PatternStream<Event> patternStream = CEP.pattern(input, pattern); OutputTag<Alert> outputTag = new OutputTag<Alert>("side-output"){}; SingleOutputStreamOperator<Alert> ignored = patternStream.process(new MyPatternProcessFunction<>(outputTag)); final DataStream<Alert> sideOutput = ignored.getSideOutput(outputTag); // execute program env.execute("Flink Streaming Java API Skeleton"); } public static class MyPatternProcessFunction<Event, Alert> extends PatternProcessFunction<Event, Alert> implements TimedOutPartialMatchHandler<Event> { private final OutputTag<Alert> outputTag; public MyPatternProcessFunction(OutputTag<Alert> outputTag) { this.outputTag = outputTag; } @Override public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<Alert> out) throws Exception { // don't do anything since we want the time out case } @Override public void processTimedOutMatch(Map<String, List<Event>> match, Context ctx) throws Exception { Event startEvent = match.get("start").get(0); ctx.output(outputTag, new Alert(startEvent)); } } } public static class Event { boolean isForward() {} boolean isCancel() {} } public static class Alert {} So what we are doing here is to define a pattern forward followed by any cancel. Moreover we say that it must happen within 1 minute. If this does not happen then we will see a timeout where we can create the follow-up event. Cheers, Till On Thu, Apr 30, 2020 at 12:48 PM Maxim Parkachov <lazy.gop...@gmail.com> wrote: > Hi everyone, > > I need to implement following functionality. There is a kafka topic where > "forward" events are coming and in the same topic there are "cancel" > events. For each "forward" event I need to wait 1 minute for possible > "cancel" event. I can uniquely match both events. If "cancel" event comes > within this minute I need to delete "forward" event, otherwise pass > "forward" event further in another kafka topic. "Cancel" events older than > 1 minute could be ignored. > > I was thinking to implement ProcessFunction with putting "forward" events > in state with timer. If "cancel" event is coming I will delete "forward" > event from state. > > My question: Is there more simple way to implement the same logic, > possibly with CEP ? > > Thanks, > Maxim. >