[ https://issues.apache.org/jira/browse/FLINK-20814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256341#comment-17256341 ]
little-tomato commented on FLINK-20814: --------------------------------------- my pom file: <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_2.12</artifactId> <version>1.12.0</version> </dependency> </dependencies> > The CEP code is not running properly > ------------------------------------ > > Key: FLINK-20814 > URL: https://issues.apache.org/jira/browse/FLINK-20814 > Project: Flink > Issue Type: Bug > Components: Library / CEP > Affects Versions: 1.12.0 > Environment: flink1.12.0 > jdk1.8 > Reporter: little-tomato > Priority: Blocker > > The cep code is running properly on flink1.11.2,but it is not working > properly on flink1.12.0. > Can somebody help me? > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // DataStream : source > DataStream<TemperatureEvent> input = env.fromElements(new > TemperatureEvent(1,"Device01", 22.0), > new TemperatureEvent(1,"Device01", 27.1), new > TemperatureEvent(2,"Device01", 28.1), > new TemperatureEvent(1,"Device01", 22.2), new > TemperatureEvent(3,"Device01", 22.1), > new TemperatureEvent(1,"Device02", 22.3), new > TemperatureEvent(4,"Device02", 22.1), > new TemperatureEvent(1,"Device02", 22.4), new > TemperatureEvent(5,"Device02", 22.7), > new TemperatureEvent(1,"Device02", 27.0), new > TemperatureEvent(6,"Device02", 30.0)); > > Pattern<TemperatureEvent, ?> warningPattern = > Pattern.<TemperatureEvent>begin("start") > .subtype(TemperatureEvent.class) > .where(new SimpleCondition<TemperatureEvent>() { > @Override > public boolean filter(TemperatureEvent subEvent) { > if (subEvent.getTemperature() >= 26.0) { > return true; > } > return false; > } > }).where(new SimpleCondition<TemperatureEvent>() { > @Override > public boolean filter(TemperatureEvent subEvent) { > if (subEvent.getMachineName().equals("Device02")) { > return true; > } > return false; > } > }).within(Time.seconds(10)); > DataStream<Alert> patternStream = CEP.pattern(input, warningPattern) > .select( > new RichPatternSelectFunction<TemperatureEvent, > Alert>() { > /** > * > */ > private static final > long serialVersionUID = 1L; > @Override > public void > open(Configuration parameters) throws Exception { > > System.out.println(getRuntimeContext().getUserCodeClassLoader()); > } > @Override > public Alert select(Map<String, > List<TemperatureEvent>> event) throws Exception { > > return new Alert("Temperature Rise Detected: > " + event.get("start") + " on machine name: " + event.get("start")); > } > }); > patternStream.print(); > env.execute("CEP on Temperature Sensor"); > it should be output(on flink1.11.2): > Alert [message=Temperature Rise Detected: [TemperatureEvent > [getTemperature()=27.0, getMachineName=Device02]] on machine name: > [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]]] > Alert [message=Temperature Rise Detected: [TemperatureEvent > [getTemperature()=30.0, getMachineName=Device02]] on machine name: > [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]]] -- This message was sent by Atlassian Jira (v8.3.4#803005)