[ 
https://issues.apache.org/jira/browse/FLINK-5753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010121#comment-16010121
 ] 

Michał Jurkiewicz commented on FLINK-5753:
------------------------------------------

Hi [~kkl0u],

Here is what I set:

{code}
myInputStream.assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator());
{code}

{code}
/**
         * This generator generates watermarks that are lagging behind 
processing time by a certain amount.
         * It assumes that elements arrive in Flink after at most a certain 
time.
         */
        private static class TimeLagWatermarkGenerator implements 
AssignerWithPeriodicWatermarks<Event> {

                /** The Constant serialVersionUID. */
                private static final long serialVersionUID = 1L;
                
                /** The Constant OUT_OF_ORDERNESS_THRESHOLD. */
                private static final int OUT_OF_ORDERNESS_THRESHOLD_MILLIS = 
5000;

                /* (non-Javadoc)
                 * @see 
org.apache.flink.streaming.api.functions.TimestampAssigner#extractTimestamp(java.lang.Object,
 long)
                 */
                @Override
                public long extractTimestamp(Event event, long 
previousElementTimestamp) {
                        return event.getEventTimestamp().getTime();
                }

                /* (non-Javadoc)
                 * @see 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks#getCurrentWatermark()
                 */
                @Override
                public Watermark getCurrentWatermark() {
                        // return the watermark as current time minus the 
maximum time lag
                        return new Watermark(System.currentTimeMillis() - 
OUT_OF_ORDERNESS_THRESHOLD_MILLIS);
                }
        }
{code}

Should I also add {code}env.getConfig().setAutoWatermarkInterval(100L);{code} 
this configuration ?

> CEP timeout handler.
> --------------------
>
>                 Key: FLINK-5753
>                 URL: https://issues.apache.org/jira/browse/FLINK-5753
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.1.2
>            Reporter: Michał Jurkiewicz
>            Assignee: Kostas Kloudas
>
> I configured the following flink job in my environment:
> {code}
> Pattern<Event, ?> patternCommandStarted = Pattern.<Event> 
> begin("event-accepted").subtype(Event.class)
> .where(e -> {event accepted where 
> statement}).next("second-event-started").subtype(Event.class)
> .where(e -> {event started where statement}))
> .within(Time.seconds(30));
> DataStream<Either<Event, Event>> events = CEP
>   .pattern(eventsStream.keyBy(e -> e.getEventProperties().get("deviceCode")), 
> patternCommandStarted)
>   .select(eventSelector, eventSelector);
> static class EventSelector implements PatternSelectFunction<Event, Event>, 
> PatternTimeoutFunction<Event, Event> {}
> {code}
> The problem that I have is related to timeout handling. I observed that: 
> if: first event appears, second event not appear in the stream  
> and *no new events appear in a stream*, timeout handler is not executed.
> Expected result: timeout handler should be executed in case if there are no 
> new events in a stream



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to