[ https://issues.apache.org/jira/browse/FLINK-21311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280391#comment-17280391 ]
forrest_feng commented on FLINK-21311: -------------------------------------- package com.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternSelectFunction; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; public class CEPTest { private static Logger log = (Logger) LoggerFactory.getLogger(CEPTest.class); public static class Event { private Integer id; private String name; public Event(Integer id, String name) { this.id = id; this.name = name; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } } public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator<Event> eventDataStream = env.socketTextStream("127.0.0.1", 9999) .flatMap(new FlatMapFunction<String, Event>() { @Override public void flatMap(String s, Collector<Event> collector) throws Exception { //if (StringUtil.isNotEmpty(s)) { String[] split = s.split(","); if (split.length == 2) { collector.collect(new Event(Integer.valueOf(split[0]), split[1])); } // } } }); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where( new SimpleCondition<Event>() { @Override public boolean filter(Event event) { System.out.println(event.getId()+"======"); log.info("start {}", event.getId()); return event.getId() == 42; } } ).next("middle").where( new SimpleCondition<Event>() { @Override public boolean filter(Event event) { log.info("middle {}", event.getId()); return event.getId() >= 10; } } ); CEP.pattern(eventDataStream, pattern).select(new PatternSelectFunction<Event, String>() { @Override public String select(Map<String, List<Event>> p) throws Exception { StringBuilder builder = new StringBuilder(); log.info("p = {}", p); builder.append(p.get("start").get(0).getId()).append(",").append(p.get("start").get(0).getName()).append("\n") .append(p.get("middle").get(0).getId()).append(",").append(p.get("middle").get(0).getName()); return builder.toString(); } }).print();//打印结果 env.execute("flink learning cep"); } } > flink1.12 cep does not execute pattern but flink1.10 is ok > ------------------------------------------------------------ > > Key: FLINK-21311 > URL: https://issues.apache.org/jira/browse/FLINK-21311 > Project: Flink > Issue Type: Bug > Components: Library / CEP > Affects Versions: 1.12.0 > Reporter: forrest_feng > Priority: Major > Attachments: CEPTest.java > > > I am trying to test CEP demo according to Official website example, I found > no data enter the pattern to print log , finally, I replace my FLINK > version from 1.12 to 1.10, Flink 1.10 is ok, may be this is bug of Flink > 1.12 -- This message was sent by Atlassian Jira (v8.3.4#803005)