+Kostas and +Dawid Could you please have a look? You two have worked in these parts most recently. I recall that there were some problems when it comes to event time and out-of-order processing in CEP in Flink 1.2
Best, Aljoscha > On 19. Apr 2017, at 15:28, Luis Lázaro <lalaz...@keedio.com> wrote: > > > Hi everyone, > i am working on a use case with CEP and Flink: > > Flink 1.2 > Source is Kafka configured with one single partition. > Data are syslog standard messages parsed as LogEntry (object with attributes > like timestamp, service, severity, etc) > An event is a LogEntry. > If two consecutives LogEntry with severity ERROR (3) and same service are > matched in 10 minutes period, an ErrorAlert must be triggered. > > > Allthough i cannot warrant the ascending order of events (LogEntry) when > consuming from kafka, i decided to try this implementation: > Timestamps per Kafka partition > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition> > > > //My events provide its own timestamps > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > //"Watermarks are generated inside the Kafka consumer, per Kafka partition": > val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new > FlinkKafkaConsumer08[LogEntry]( > parameterTool.getRequired("topic"), new > LogEntrySchema(parameterTool.getBoolean("parseBody", true)), > parameterTool.getProperties) > > //may not be ascending order > val kafkaSourceAssignedTimesTamp = > kafkaSource.assignTimestampsAndWatermarks(new > AscendingTimestampExtractor[LogEntry] { > override def extractAscendingTimestamp(t: LogEntry): Long = { > ProcessorHelper.toTimestamp(t.timestamp).getTime > } > }) > > val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp) > > I implemented a pattern like: > > myPattern = > Pattern > .begin[LogEntry]("First Event") > .subtype(classOf[LogEntry]) > .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR")) > .next("Second Event") > .subtype(classOf[LogEntry]) > .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR")) > .within(Time.minutes(10)) > } > > This pattern will trigger alert when two consecutives LogEntry with > severity ERROR and with same service (it will be generate alerts for each > service individually) > > CEP.pattern(stream > .keyBy(_.service), > myPattern) > > > An alert is made of two logEntry: > > ErrorAlert: > service_name-ERROR-timestamp first event > service_name-ERROR-timestamp second event > > I am getting alerts like this: > > ErrorAlert: > service_2-3-2017-04-19 06:57:49 > service_2-3-2017-04-19 07:02:23 > > ErrorAlert: > service_2-3-2017-04-19 07:32:37 > service_2-3-2017-04-19 07:34:06 > > ErrorAlert: > service_1-3-2017-04-19 07:25:04 > service_1-3-2017-04-19 07:29:39 > > ErrorAlert: > service_1-3-2017-04-19 07:29:39 > service_1-3-2017-04-19 07:30:37 > > ErrorAlert: > service_3-3-2017-04-19 07:49:27 > service_3-3-2017-04-19 06:59:10 ---> ups! > > ErrorAlert: > service_2-3-2017-04-19 07:50:06 > service_2-3-2017-04-19 06:54:48 ---> ups! > > ErrorAlert: > service_2-3-2017-04-19 06:54:48 > service_2-3-2017-04-19 06:55:03 > > ErrorAlert: > service_3-3-2017-04-19 07:21:11 > service_3-3-2017-04-19 07:24:52 > > ErrorAlert: > service_1-3-2017-04-19 07:30:05 > service_1-3-2017-04-19 07:31:33 > > ErrorAlert: > service_3-3-2017-04-19 07:08:31 > service_3-3-2017-04-19 07:17:42 > > ErrorAlert: > service_1-3-2017-04-19 07:02:30 > service_1-3-2017-04-19 07:06:58 > > ErrorAlert: > service_3-3-2017-04-19 07:03:50 > service_3-3-2017-04-19 07:11:48 > > ErrorAlert: > service_3-3-2017-04-19 07:19:04 > service_3-3-2017-04-19 07:21:25 > > ErrorAlert: > service_3-3-2017-04-19 07:33:13 > service_3-3-2017-04-19 07:38:47 > > > I also tried this approach: > bounded out-of-orderness > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness> > > kafkaSource.assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) { > override def extractTimestamp(t: LogEntry): Long = { > ProcessorHelper.toTimestamp(t.timestamp).getTime > } > }) > > Time.seconds(0) —> if i set like this, do i prevent the events from being > delivered with delayed ? > > But i get the same problem as decribed above: > > …… > ErrorAlert: > service_3-3-2017-04-19 07:49:27 > service_3-3-2017-04-19 06:59:10 ---> ups! > > ErrorAlert: > service_2-3-2017-04-19 07:50:06 > service_2-3-2017-04-19 06:54:48 ---> ups! > …... > > Initially i thought my pattern was not correctly implemented but the problem > seems to be i am unable to assign timestamp and consequently emit watermark > properly when events are unordered. > > Any sugestion is well apreciated, thanks in advance. > > > Best regards, Luis >