[ https://issues.apache.org/jira/browse/FLINK-5753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010121#comment-16010121 ]
Michał Jurkiewicz commented on FLINK-5753: ------------------------------------------ Hi [~kkl0u], Here is what I set: {code} myInputStream.assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator()); {code} {code} /** * This generator generates watermarks that are lagging behind processing time by a certain amount. * It assumes that elements arrive in Flink after at most a certain time. */ private static class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<Event> { /** The Constant serialVersionUID. */ private static final long serialVersionUID = 1L; /** The Constant OUT_OF_ORDERNESS_THRESHOLD. */ private static final int OUT_OF_ORDERNESS_THRESHOLD_MILLIS = 5000; /* (non-Javadoc) * @see org.apache.flink.streaming.api.functions.TimestampAssigner#extractTimestamp(java.lang.Object, long) */ @Override public long extractTimestamp(Event event, long previousElementTimestamp) { return event.getEventTimestamp().getTime(); } /* (non-Javadoc) * @see org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks#getCurrentWatermark() */ @Override public Watermark getCurrentWatermark() { // return the watermark as current time minus the maximum time lag return new Watermark(System.currentTimeMillis() - OUT_OF_ORDERNESS_THRESHOLD_MILLIS); } } {code} Should I also add {code}env.getConfig().setAutoWatermarkInterval(100L);{code} this configuration ? > CEP timeout handler. > -------------------- > > Key: FLINK-5753 > URL: https://issues.apache.org/jira/browse/FLINK-5753 > Project: Flink > Issue Type: Bug > Components: CEP > Affects Versions: 1.1.2 > Reporter: Michał Jurkiewicz > Assignee: Kostas Kloudas > > I configured the following flink job in my environment: > {code} > Pattern<Event, ?> patternCommandStarted = Pattern.<Event> > begin("event-accepted").subtype(Event.class) > .where(e -> {event accepted where > statement}).next("second-event-started").subtype(Event.class) > .where(e -> {event started where statement})) > .within(Time.seconds(30)); > DataStream<Either<Event, Event>> events = CEP > .pattern(eventsStream.keyBy(e -> e.getEventProperties().get("deviceCode")), > patternCommandStarted) > .select(eventSelector, eventSelector); > static class EventSelector implements PatternSelectFunction<Event, Event>, > PatternTimeoutFunction<Event, Event> {} > {code} > The problem that I have is related to timeout handling. I observed that: > if: first event appears, second event not appear in the stream > and *no new events appear in a stream*, timeout handler is not executed. > Expected result: timeout handler should be executed in case if there are no > new events in a stream -- This message was sent by Atlassian JIRA (v6.3.15#6346)