Hi Kieran, which statebackend are you using for your CEP job? Using RocksDB as a state backend could potentially fix the issue. What's the number of keys in your stream?
On Tue, Nov 29, 2016 at 3:18 PM, kieran . <kieran0...@hotmail.com> wrote: > Hello, > > I am currently building a multi-tenant monitoring application and > exploring the effectiveness of different Complex Event Processors (CEP) and > whether or not this would be a potential solution for what I want to > achieve. I have created a small test application which utilises Flink and > its CEP but I have come across some issues when dealing with a large number > of metrics to monitor when using patterns/pattern streams. Flink seems to > operate as expected with one, or several patterns each consuming it's own > PatternStream, but as soon as more are introduced the memory usage of Flink > seems to rise rather quickly and eventually throw an OutOfMemoryError. My > initial idea was to create one pattern/pattern stream for each metric that > I need to monitor, but there could be many thousands of these. > > I create the PatternStream per Pattern like this to monitor a metric: > > * Pattern<MetricData, ?> pattern = Pattern.<MetricData> begin( > patternName ).subtype( MetricData.class )* > > * .where(* > > * (evt -> evt.getValues().get( "max" ).longValue() > 50.0* > > * && evt.account_id.equals( accountName )) );* > > > * check.withPattern( pattern )* > > * .withTimePeriod( Integer.valueOf( 1 ) )* > > * .withCooldown( Integer.valueOf( 1 ) )* > > * .withName( checkName )* > > * .withAlertStatus( AlertStatus.OK )* > > * > .setPatternStream(CEP.pattern(messageStream.keyBy("account_id"), pattern));* > > > To trigger these patterns, I use > > * PatternSelectFunction<MetricData, MetricWarning> psf = new > PatternSelectFunction<MetricData, MetricWarning>()* > > * {* > > * @Override* > > * public MetricWarning select( Map<String, MetricData> map ) > throws Exception* > > * {* > > * return new MetricWarning(map.get(patternKey), name, > accountId);* > > * }* > > > * };* > > > * try* > > * {* > > * check.getPatternStream().select(psf);* > > * }* > > * catch( Exception exception )* > > * {* > > * exception.printStackTrace();* > > * }* > > > > The pattern in the above example is tied to a specific stream which would > result in one stream per pattern and this seems to be an issue using this > approach. If it would be possible to run one pattern stream and switching > out the patterns when needed, then perhaps this would be a viable solution. > Am I approaching this in the right way by creating a stream for each > pattern? > > Would it be possible to create a set of Pattern processors that could be > run against a single PatternStream or is there anything you could suggest > which would allow me to do this with Flink? > > Thanks, > - Kieran > > <http://aka.ms/weboutlook> >