[ 
https://issues.apache.org/jira/browse/FLINK-21311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280391#comment-17280391
 ] 

forrest_feng commented on FLINK-21311:
--------------------------------------

package com.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

public class CEPTest {
 private static Logger log = (Logger) LoggerFactory.getLogger(CEPTest.class);



 public static class Event {
 private Integer id;
 private String name;

 public Event(Integer id, String name) {
 this.id = id;
 this.name = name;
 }


 public Integer getId() {
 return id;
 }

 public void setId(Integer id) {
 this.id = id;
 }

 public String getName() {
 return name;
 }

 public void setName(String name) {
 this.name = name;
 }
 }


 public static void main(String[] args) throws Exception{



 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

 SingleOutputStreamOperator<Event> eventDataStream = 
env.socketTextStream("127.0.0.1", 9999)
 .flatMap(new FlatMapFunction<String, Event>() {
 @Override
 public void flatMap(String s, Collector<Event> collector) throws Exception {
 //if (StringUtil.isNotEmpty(s)) {
 String[] split = s.split(",");
 if (split.length == 2) {
 collector.collect(new Event(Integer.valueOf(split[0]), split[1]));
 }
 // }
 }
 });



 Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
 new SimpleCondition<Event>() {
 @Override
 public boolean filter(Event event) {
 System.out.println(event.getId()+"======");
 log.info("start {}", event.getId());
 return event.getId() == 42;
 }
 }
 ).next("middle").where(
 new SimpleCondition<Event>() {
 @Override
 public boolean filter(Event event) {
 log.info("middle {}", event.getId());
 return event.getId() >= 10;
 }
 }
 );

 CEP.pattern(eventDataStream, pattern).select(new PatternSelectFunction<Event, 
String>() {
 @Override
 public String select(Map<String, List<Event>> p) throws Exception {
 StringBuilder builder = new StringBuilder();
 log.info("p = {}", p);
 
builder.append(p.get("start").get(0).getId()).append(",").append(p.get("start").get(0).getName()).append("\n")
 
.append(p.get("middle").get(0).getId()).append(",").append(p.get("middle").get(0).getName());
 return builder.toString();
 }
 }).print();//打印结果



 env.execute("flink learning cep");







 }



}

> flink1.12  cep does not execute pattern but flink1.10 is ok 
> ------------------------------------------------------------
>
>                 Key: FLINK-21311
>                 URL: https://issues.apache.org/jira/browse/FLINK-21311
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.12.0
>            Reporter: forrest_feng
>            Priority: Major
>         Attachments: CEPTest.java
>
>
> I am trying to test CEP demo according to  Official website example, I found 
> no data enter the pattern to print log , finally, I  replace my  FLINK 
> version from 1.12 to 1.10, Flink 1.10 is ok, may be this is  bug of Flink 
> 1.12 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to