
forrest_feng commented on FLINK-21311:

package com.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

public class CEPTest {
 private static Logger log = (Logger) LoggerFactory.getLogger(CEPTest.class);

 public static class Event {
 private Integer id;
 private String name;

 public Event(Integer id, String name) {
 this.id = id;
 this.name = name;

 public Integer getId() {
 return id;

 public void setId(Integer id) {
 this.id = id;

 public String getName() {
 return name;

 public void setName(String name) {
 this.name = name;

 public static void main(String[] args) throws Exception{

 StreamExecutionEnvironment env = 

 SingleOutputStreamOperator<Event> eventDataStream = 
env.socketTextStream("", 9999)
 .flatMap(new FlatMapFunction<String, Event>() {
 public void flatMap(String s, Collector<Event> collector) throws Exception {
 //if (StringUtil.isNotEmpty(s)) {
 String[] split = s.split(",");
 if (split.length == 2) {
 collector.collect(new Event(Integer.valueOf(split[0]), split[1]));
 // }

 Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
 new SimpleCondition<Event>() {
 public boolean filter(Event event) {
 log.info("start {}", event.getId());
 return event.getId() == 42;
 new SimpleCondition<Event>() {
 public boolean filter(Event event) {
 log.info("middle {}", event.getId());
 return event.getId() >= 10;

 CEP.pattern(eventDataStream, pattern).select(new PatternSelectFunction<Event, 
String>() {
 public String select(Map<String, List<Event>> p) throws Exception {
 StringBuilder builder = new StringBuilder();
 log.info("p = {}", p);
 return builder.toString();

 env.execute("flink learning cep");



> flink1.12  cep does not execute pattern but flink1.10 is ok 
> ------------------------------------------------------------
>                 Key: FLINK-21311
>                 URL: https://issues.apache.org/jira/browse/FLINK-21311
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.12.0
>            Reporter: forrest_feng
>            Priority: Major
>         Attachments: CEPTest.java
> I am trying to test CEP demo according to  Official website example, I found 
> no data enter the pattern to print log , finally, I  replace my  FLINK 
> version from 1.12 to 1.10, Flink 1.10 is ok, may be this is  bug of Flink 
> 1.12 

This message was sent by Atlassian Jira

Reply via email to