Hi , Thanks a lot for the help last time, I have a few more questions and I chose to create a new topic as the problem in the previous topic was solved, thanks to useful inputs from Flink Community. The questions are as follows
*1.* What time does the "within" operator works on "Event Time" or "Processing Time", I am asking this as I wanted to know whether something like the following would be captured or not. MaxOutofOrderness is set to 10 mins, and "within" operator is specified for 5 mins. So if a first events event time is at 1:00 and the corresponding next event is has an event time of 1:04 but it arrives in the system at 1:06. Would this still be processed and alert would be generated or not? *2.* What would happen if I don't have a key to specify, the way 2 events are correlated is by using the ctx of the first event and matching some different id. So, we can't group by some unique field. I tried a test run without specifying a key and it apparently works. But how is the shuffling done then in this case? *3.* This is one of the major issue, So I could use Event Time with ascending event time extractor for one of my kafka topic because its behavior is consistent. But when i added another topic to read from where the events are not in ascending order, using ascending timestampextractor gave me timestamp monotonicity violation. Then when I am using BoundedOutOfOrdernessTimestampExtractor for the same, I am not getting any warnings anymore but I am no more getting my alerts. If I go back to using processing time, then I am again getting alerts properly. What could be the problem here? *This is the code I am using:* /public class CEPForBAM { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); System.out.println(env.getStreamTimeCharacteristic()); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(10000); // configure Kafka consumer Properties props = new Properties(); props = getDefaultProperties(props); FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>( Arrays.asList("topic1", "topic_x", "topic_test"), new StringSerializerToEvent(), props); kafkaSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(60)) { private static final long serialVersionUID = -7228487240278428374L; @Override public long extractTimestamp(BAMEvent event) { return event.getTimestamp(); } }); DataStream<BAMEvent> events = env.addSource(kafkaSource); // Input stream of monitoring events /* DataStream<BAMEvent> partitionedInput = events .keyBy((KeySelector<BAMEvent, String>) BAMEvent::getId);*/ evetns.print(); //partitionedInput.print(); Pattern<BAMEvent, ?> pattern = Pattern.<BAMEvent>begin("first") .where(new SimpleCondition<BAMEvent>() { private static final long serialVersionUID = 1390448281048961616L; @Override public boolean filter(BAMEvent event) throws Exception { return event.getEventName().equals(ReadEventType.class.getSimpleName()); } }) .followedBy("second") .where(new IterativeCondition<BAMEvent>() { private static final long serialVersionUID = -9216505110246259082L; @Override public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception { if (secondEvent.getEventName().equals(StatusChangedEventType.class.getSimpleName())) { for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) { if (secondEvent.getCorrelationID().contains(firstEvent.getEventId())) return true; } } return false; } }) .within(Time.minutes(10)); PatternStream<BAMEvent> patternStream = CEP.pattern(events, pattern); DataStream<Either<String, String>> alerts = patternStream.select(new PatternTimeoutFunction<BAMEvent, String>() { private static final long serialVersionUID = -8717561187522704500L; @Override public String timeout(Map<String, List<BAMEvent>> map, long l) throws Exception { return "TimedOut: " + map.toString() + " @ " + l; } }, new PatternSelectFunction<BAMEvent, String>() { private static final long serialVersionUID = 3144439966791408980L; @Override public String select(Map<String, List<BAMEvent>> pattern) throws Exception { BAMEvent bamEvent = pattern.get("first").get(0); return "Matched Events: " + bamEvent.getEventId() + "_" + bamEvent.getEventName(); } }); alerts.print(); env.execute("CEP monitoring job"); } }/ Even when I am using Event Time, I am getting events from kafka as can be shown from event.print() -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-tp13454.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.