//Creating a window of ten items
/WindowedStream<ObservationEvent,Tuple,GlobalWindow> windowStream =
inputStream.keyBy("rackId").countWindow(10);/
// Applying a Window Function , adding some custom evaluating all the values
in the window
/DataStream<ObservationEvent> inactivityStream = windowStream.apply(new
WindowFunction<ObservationEvent, ObservationEvent , Tuple , GlobalWindow>()
{
@Override
public void apply(Tuple tuple, GlobalWindow timeWindow,
Iterable<ObservationEvent> itr, Collector<ObservationEvent> out)
//custom evaluation logic
out.collect(new
ObservationEvent(1,"temperature", "stable"));
}
});/
//Defining Simple CEP Pattern
/Pattern<ObservationEvent, ?> inactivityPattern =
Pattern.<ObservationEvent>begin("first")
.subtype(ObservationEvent.class)
.where(new FilterFunction<ObservationEvent>() {
@Override
public boolean filter(ObservationEvent
arg0) throws Exception {
System.out.println( arg0 );
*//This function is not at all called*
return false;
}
});/
/PatternStream<ObservationEvent> inactivityCEP =
CEP.pattern(inactivityStream.keyBy("rackId"), inactivityPattern);
/
When I run this code, the filter function inside the where clause is not at
all getting called.
I have printed the inactivityStream.print() and I can see the matching
value.
Now, when I plug in the inputStream directly without applying a window. The
pattern is matching
I printed inputStream and WindowedStream and I can see they both send
similar kind of data.
What am I missing
--
View this message in context:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DataStream-and-CEP-Pattern-not-matching-after-applying-a-window-tp15164.html
Sent from the Apache Flink Mailing List archive. mailing list archive at
Nabble.com.