[ 
https://issues.apache.org/jira/browse/FLINK-20814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17259471#comment-17259471
 ] 

Yao Zhang commented on FLINK-20814:
-----------------------------------

Hi [~lprince]

I reproduced this issue with no output for patternStream, however it works as 
expected in Flink 1.11.

This problem was solved by adding inProcessingTime() after invoking 
CEP.pattern. As shown below:

DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
 .inProcessingTime() // Enable processing time
 .select(
 new RichPatternSelectFunction<TemperatureEvent, Alert>() {
 /**

 */
 private static final long serialVersionUID = 1L;

 @Override
 public void open(Configuration parameters) throws Exception {
 System.out.println(getRuntimeContext().getUserCodeClassLoader());
 }

 @Override
 public Alert select(Map<String, List<TemperatureEvent>> event) throws 
Exception {
 return new Alert("Temperature Rise Detected: " + event.get("start") + " on 
machine name: " + event.get("start"));
 }
 });

 

The trick here is that since Flink 1.12 the default stream time characteristic 
has been changed to EventTime. Thus according to your snippet, it will wait 
until all elements whose event times are within 10 seconds have all arrived. 
That's the reason why you got no output.

 

> The CEP code is not running properly
> ------------------------------------
>
>                 Key: FLINK-20814
>                 URL: https://issues.apache.org/jira/browse/FLINK-20814
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.12.0
>         Environment: flink1.12.0
> jdk1.8
>            Reporter: little-tomato
>            Priority: Blocker
>
> The cep code is running properly on flink1.11.2,but it is not working 
> properly on flink1.12.0.
> Can somebody help me?
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         // DataStream : source
>               DataStream<TemperatureEvent> input = env.fromElements(new 
> TemperatureEvent(1,"Device01", 22.0),
>                 new TemperatureEvent(1,"Device01", 27.1), new 
> TemperatureEvent(2,"Device01", 28.1),
>                 new TemperatureEvent(1,"Device01", 22.2), new 
> TemperatureEvent(3,"Device01", 22.1),
>                 new TemperatureEvent(1,"Device02", 22.3), new 
> TemperatureEvent(4,"Device02", 22.1),
>                 new TemperatureEvent(1,"Device02", 22.4), new 
> TemperatureEvent(5,"Device02", 22.7),
>                 new TemperatureEvent(1,"Device02", 27.0), new 
> TemperatureEvent(6,"Device02", 30.0));
>         
>         Pattern<TemperatureEvent, ?> warningPattern = 
> Pattern.<TemperatureEvent>begin("start")
>                 .subtype(TemperatureEvent.class)
>                 .where(new SimpleCondition<TemperatureEvent>() {
>                       @Override
>                     public boolean filter(TemperatureEvent subEvent) {
>                         if (subEvent.getTemperature() >= 26.0) {
>                             return true;
>                         }
>                         return false;
>                     }
>                 }).where(new SimpleCondition<TemperatureEvent>() {
>                       @Override
>                       public boolean filter(TemperatureEvent subEvent) {
>                         if (subEvent.getMachineName().equals("Device02")) {
>                             return true;
>                         }
>                         return false;
>                     }
>                 }).within(Time.seconds(10));
>         DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
>                 .select(
>                         new RichPatternSelectFunction<TemperatureEvent, 
> Alert>() {
>                             /**
>                                                        * 
>                                                        */
>                                                       private static final 
> long serialVersionUID = 1L;
>                                                       @Override
>                                                       public void 
> open(Configuration parameters) throws Exception {
>                                                               
> System.out.println(getRuntimeContext().getUserCodeClassLoader());
>                                                       }
>                                                       @Override
>                             public Alert select(Map<String, 
> List<TemperatureEvent>> event) throws Exception {
>                               
>                                 return new Alert("Temperature Rise Detected: 
> " + event.get("start") + " on machine name: " + event.get("start"));
>                             }
>                         });
>         patternStream.print();
>         env.execute("CEP on Temperature Sensor");
> it should be output(on flink1.11.2):
> Alert [message=Temperature Rise Detected: [TemperatureEvent 
> [getTemperature()=27.0, getMachineName=Device02]] on machine name: 
> [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]]]
> Alert [message=Temperature Rise Detected: [TemperatureEvent 
> [getTemperature()=30.0, getMachineName=Device02]] on machine name: 
> [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]]]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to