Hi Maxim,

I think your problem should be solvable with the CEP library:

public static void main(String[] args) throws Exception {
      // set up the streaming execution environment
      final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

      DataStream<Event> input = ...;

      Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
            new SimpleCondition<Event>() {
               @Override
               public boolean filter(Event event) {
                  return event.isForward();
               }
            }
      ).followedBy("end").where(
            new SimpleCondition<Event>() {
               @Override
               public boolean filter(Event event) {
                  return event.isCancel();
               }
            }
      ).within(Time.minutes(1));

      PatternStream<Event> patternStream = CEP.pattern(input, pattern);

      OutputTag<Alert> outputTag = new OutputTag<Alert>("side-output"){};

      SingleOutputStreamOperator<Alert> ignored =
patternStream.process(new MyPatternProcessFunction<>(outputTag));

      final DataStream<Alert> sideOutput = ignored.getSideOutput(outputTag);

      // execute program
      env.execute("Flink Streaming Java API Skeleton");
   }

   public static class MyPatternProcessFunction<Event, Alert> extends
PatternProcessFunction<Event, Alert> implements
TimedOutPartialMatchHandler<Event> {
      private final OutputTag<Alert> outputTag;

      public MyPatternProcessFunction(OutputTag<Alert> outputTag) {
         this.outputTag = outputTag;
      }

      @Override
      public void processMatch(Map<String, List<Event>> match, Context
ctx, Collector<Alert> out) throws Exception {
         // don't do anything since we want the time out case
      }

      @Override
      public void processTimedOutMatch(Map<String, List<Event>> match,
Context ctx) throws Exception {
         Event startEvent = match.get("start").get(0);
         ctx.output(outputTag, new Alert(startEvent));
      }
   }
}

   public static class Event {
      boolean isForward() {}

      boolean isCancel() {}
   }

   public static class Alert {}


So what we are doing here is to define a pattern forward followed by any
cancel. Moreover we say that it must happen within 1 minute. If this does
not happen then we will see a timeout where we can create the follow-up
event.

Cheers,
Till

On Thu, Apr 30, 2020 at 12:48 PM Maxim Parkachov <lazy.gop...@gmail.com>
wrote:

> Hi everyone,
>
> I need to implement following functionality. There is a kafka topic where
> "forward" events are coming and in the same topic there are "cancel"
> events. For each "forward" event I need to wait 1 minute for possible
> "cancel" event. I can uniquely match both events. If "cancel" event comes
> within this minute I need to delete "forward" event, otherwise pass
> "forward" event further in another kafka topic. "Cancel" events older than
> 1 minute could be ignored.
>
> I was thinking to implement ProcessFunction with putting "forward" events
> in state with timer. If "cancel" event is coming I will delete "forward"
> event from state.
>
> My question: Is there more simple way to implement the same logic,
> possibly with CEP ?
>
> Thanks,
> Maxim.
>

Reply via email to