If you are trying to do the same CEP computation for each event type that's exactly what will happen for a KeyedStream. For example if you key by event type you can think of this like creating a separate substream for each key/eventType and then applying the CEP operations to each substream individually. Is this not what you're trying to do?
On Wed, Jan 4, 2017 at 3:32 AM, madhairsilence <harish.kum...@tcs.com> wrote: > > This is my code. > > * SplitStream<MonitoringEvent> splitStream = inputStream.split(new > OutputSelector<MonitoringEvent>() { > > @Override > public Iterable<String> select(MonitoringEvent me) { > > List<String> ml = new ArrayList<String>(); > ml.add(me.getEventType()); > return ml; > }* > I have stream of Monitoring Events coming in random order temp : 80, > pressure : 70 , humidity :80, temp:30... > > With the above code, am splitting the stream , eventType wise i.e > temperatureStream, pressureStream. > > The problem is , if I know the eventType, i can select it from the > splitStream like > > *splitStream.select('temperatureStream')* > but the eventType is dynamic and not pre-defined. > > How will I apply CEP for this dynamic stream. The CEP would be like, if the > > *temperate is > 90 for past 10 minutes ... > > pressure is > 90 for past 10 minutes ...* > > > > -- > View this message in context: http://apache-flink-mailing- > list-archive.1008284.n3.nabble.com/Categorize-or- > GroupBy-datastream-data-and-process-with-CEP-separately-tp15139p15148.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > -- Jamie Grier data Artisans, Director of Applications Engineering @jamiegrier <https://twitter.com/jamiegrier> ja...@data-artisans.com