Hi Norman, could you provide me an example input data set which produces the error? E.g. the list of strings you inserted into Kafka/read from Kafka?
Cheers, Till On Thu, Apr 7, 2016 at 11:05 AM, norman sp <wir12...@studserv.uni-leipzig.de > wrote: > Hi Till, > thank you. here's the code: > > public class CepStorzSimulator { > > public static void main(String[] args) throws Exception { > > final ParameterTool parameterTool = > ParameterTool.fromArgs(args); > > if(parameterTool.getNumberOfParameters() < 3) { > System.out.println("Missing > parameters!\nUsage: Kafka --topic <topic> > --bootstrap.servers <kafka brokers> --group.id <some id>"); > System.exit(1); > } > > CepStorzSimulator reader = new CepStorzSimulator(); > reader.run(parameterTool); > } > > public void run(ParameterTool parameterTool) throws Exception { > > String topic = "test-simulator"; > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > //env.getConfig().disableSysoutLogging(); > > env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, > 5000)); > //env.enableCheckpointing(15000); > // create a checkpoint every 5 > seconds > env.setParallelism(4); > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > > DataStream<String> kafkaStream = env.addSource(new > FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), > parameterTool.getProperties())); > > DataStream<Tuple5<String, String, String, String, Double>> data > = > kafkaStream.flatMap(new SplitMapper()); > > SingleOutputStreamOperator<Tuple6<String, String, String, > Double, > Double, Double>> windowedData = > data.filter(new > FilterFunction<Tuple5<String, String, String, > String, Double>>() { > > private static final long > serialVersionUID = -5952425756492833594L; > > @Override > public boolean > filter(Tuple5<String, String, String, String, Double> > val) throws Exception { > > return > val.f3.contains("target - Value"); > } > }) > .keyBy(3) > .timeWindow(Time.seconds(10), > Time.seconds(1)) > .fold(new Tuple6<>("", "", "", > 0.0d, 0.0d, 0.0d), new > pressureElementCount()); > > windowedData.print(); > > Pattern<Tuple6<String, String, String, Double, Double, > Double>, ?> > FlowFirstPattern = > Pattern.<Tuple6<String, String, String, > Double, Double, > Double>>begin("FlowOver10") > .where(new FilterFunction<Tuple6<String, > String, String, Double, > Double, Double>>() { > > private static final long serialVersionUID > = 5861517245439863889L; > > @Override > public boolean > filter(Tuple6<String, String, String, Double, Double, > Double> value) throws Exception { > > double avgFlow= > (value.f5/value.f4); > > return > value.f2.contains("Flow target - Value") && avgFlow > 25.0;// > && (value.f2 > avgFlow*1.0); > } > }) > .followedBy("PressureOver10").where(new > FilterFunction<Tuple6<String, String, String, Double, Double, Double>>() > { > > private static final long > serialVersionUID = -4037517308930307522L; > > @Override > public boolean > filter(Tuple6<String, String, String, Double, Double, > Double> value) throws Exception { > > double avgPressure = > (value.f5/value.f4); > > //System.out.println("Pressure: " + avgPressure); > > return > value.f2.equals("Pressure target - Value") && (avgPressure > > 5.0);// && (value.f2 > avgPressure*1.0); > } > }) > .within(Time.seconds(10)); > > PatternStream<Tuple6<String, String, String, Double, > Double, Double>> > FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern); > DataStream<String> warning = > FlowFirstPatternStream.select(new > PlacingWorkingTrocarWarning()); > warning.print(); > > env.execute(); > } > > private static class PlacingWorkingTrocarWarning implements > PatternSelectFunction<Tuple6<String, String, String, Double, Double, > Double>, String> { > > private static final long serialVersionUID = > 2576609635170800026L; > > @Override > public String select(Map<String, Tuple6<String, String, > String, Double, > Double, Double>> pat) throws Exception { > > //Tuple5<String, String, Double, Double, Double> > pressure = > pat.get("PressureOver10"); > //Tuple5<String, String, Double, Double, Double> > flow = > pat.get("FlowOver10"); > > return " ####### Warning! FlowEvent ####### "; > } > } > > private static class pressureElementCount implements > FoldFunction<Tuple5<String, String, String, String, Double>, > Tuple6<String, String, String, Double, Double, Double>>{ > > private static final long serialVersionUID = > -1081752808506520154L; > > @Override > public Tuple6<String, String, String, Double, Double, > Double> > fold(Tuple6<String, String, String, Double, Double, Double> init, > Tuple5<String, String, String, String, Double> val) throws Exception { > > double count = init.f4+1.0d; > double sum = init.f5+val.f4; //!!! > return new Tuple6<>(val.f0, val.f1, val.f3, > val.f4, count, sum); > } > } > > private static class SplitMapper extends > RichFlatMapFunction<String, > Tuple5<String, String, String, String, Double>> { > > private static final long serialVersionUID = > 7297664214330222193L; > > @Override > public void flatMap(String msg, > Collector<Tuple5<String, String, > String, String, Double>> out) throws Exception { > > EncodeValues enc = new EncodeValues(); > > getRuntimeContext().getLongCounter("eventCount").add(1L); > String[] split_msg = msg.split("\t"); > String DeviceId = split_msg[1]; > String [] array = split_msg[3].split(", \""); > > for(String a:array){ > > String[] split = a.split(":"); > String val = split[1]; > String testname = "test1"; > > String nom = val.replace("\"", > "").replace("{", "").replace("}", > "").replace(",", "."); > String param = > split[0].replace("\"", "").replace("{", "").replace("}", > ""); > > double codedVal = enc.encode(nom); > > out.collect(new Tuple5<String, > String, String, String, > Double>(UUID.randomUUID().toString(), testname, DeviceId, param, > codedVal)); > } > } > } > } > > > Example data looks like this: > 1> 00:36:06.459 1 2121 {"Pressure target - Value":"6", "Pressure > target - > Unit":"mmHg"} > 1> 00:36:06.463 1 2121 {"Flow target - Value":"23", "Flow target - > Unit":"l/min"} > > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948p5986.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >