[ https://issues.apache.org/jira/browse/FLINK-20814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17259471#comment-17259471 ]
Yao Zhang commented on FLINK-20814: ----------------------------------- Hi [~lprince] I reproduced this issue with no output for patternStream, however it works as expected in Flink 1.11. This problem was solved by adding inProcessingTime() after invoking CEP.pattern. As shown below: DataStream<Alert> patternStream = CEP.pattern(input, warningPattern) .inProcessingTime() // Enable processing time .select( new RichPatternSelectFunction<TemperatureEvent, Alert>() { /** */ private static final long serialVersionUID = 1L; @Override public void open(Configuration parameters) throws Exception { System.out.println(getRuntimeContext().getUserCodeClassLoader()); } @Override public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception { return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start")); } }); The trick here is that since Flink 1.12 the default stream time characteristic has been changed to EventTime. Thus according to your snippet, it will wait until all elements whose event times are within 10 seconds have all arrived. That's the reason why you got no output. > The CEP code is not running properly > ------------------------------------ > > Key: FLINK-20814 > URL: https://issues.apache.org/jira/browse/FLINK-20814 > Project: Flink > Issue Type: Bug > Components: Library / CEP > Affects Versions: 1.12.0 > Environment: flink1.12.0 > jdk1.8 > Reporter: little-tomato > Priority: Blocker > > The cep code is running properly on flink1.11.2,but it is not working > properly on flink1.12.0. > Can somebody help me? > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // DataStream : source > DataStream<TemperatureEvent> input = env.fromElements(new > TemperatureEvent(1,"Device01", 22.0), > new TemperatureEvent(1,"Device01", 27.1), new > TemperatureEvent(2,"Device01", 28.1), > new TemperatureEvent(1,"Device01", 22.2), new > TemperatureEvent(3,"Device01", 22.1), > new TemperatureEvent(1,"Device02", 22.3), new > TemperatureEvent(4,"Device02", 22.1), > new TemperatureEvent(1,"Device02", 22.4), new > TemperatureEvent(5,"Device02", 22.7), > new TemperatureEvent(1,"Device02", 27.0), new > TemperatureEvent(6,"Device02", 30.0)); > > Pattern<TemperatureEvent, ?> warningPattern = > Pattern.<TemperatureEvent>begin("start") > .subtype(TemperatureEvent.class) > .where(new SimpleCondition<TemperatureEvent>() { > @Override > public boolean filter(TemperatureEvent subEvent) { > if (subEvent.getTemperature() >= 26.0) { > return true; > } > return false; > } > }).where(new SimpleCondition<TemperatureEvent>() { > @Override > public boolean filter(TemperatureEvent subEvent) { > if (subEvent.getMachineName().equals("Device02")) { > return true; > } > return false; > } > }).within(Time.seconds(10)); > DataStream<Alert> patternStream = CEP.pattern(input, warningPattern) > .select( > new RichPatternSelectFunction<TemperatureEvent, > Alert>() { > /** > * > */ > private static final > long serialVersionUID = 1L; > @Override > public void > open(Configuration parameters) throws Exception { > > System.out.println(getRuntimeContext().getUserCodeClassLoader()); > } > @Override > public Alert select(Map<String, > List<TemperatureEvent>> event) throws Exception { > > return new Alert("Temperature Rise Detected: > " + event.get("start") + " on machine name: " + event.get("start")); > } > }); > patternStream.print(); > env.execute("CEP on Temperature Sensor"); > it should be output(on flink1.11.2): > Alert [message=Temperature Rise Detected: [TemperatureEvent > [getTemperature()=27.0, getMachineName=Device02]] on machine name: > [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]]] > Alert [message=Temperature Rise Detected: [TemperatureEvent > [getTemperature()=30.0, getMachineName=Device02]] on machine name: > [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]]] -- This message was sent by Atlassian Jira (v8.3.4#803005)