Hi, Dawid. Thanks for replying, happy to know you are working on this.
On 2021/07/08 12:14:21, Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi, > > Sorry for the late reply. > > Indeed I found a couple of problems with clearing the state for short > lived keys. I created a JIRA[1] issue to track it and opened a PR (which > needs test coverage before it can be merged) with fixes for those. > > Best, > > Dawid > > [1] https://issues.apache.org/jira/browse/FLINK-23314 > > On 06/07/2021 09:11, Li Jim wrote: > > Hi, Mohit, > > > > Have you figured out any solusions on this problem ? > > > > I am now facing the exactly same problem , > > > > I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but > > the checkpoint size is still growing. > > > > On 2021/06/02 15:45:59, "Singh, Mohit" <sngh...@amazon.com> wrote: > >> Hi, > >> > >> I am facing an issue with cep operator where checkpoint size keeps > >> increasing even though the pattern is fully matched. I have a stream with > >> unique user id and I want to detect a pattern of product purchased by user. > >> > >> here is the sample stream data > >> > >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 > >> ","product":"product1","bids":3,"ts":"1622644781243"} > >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 > >> ","product":"product2","bids":6,"ts":"1622644781245"} > >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 > >> ","product":"product2","bids":4,"ts":"1622644781247"} > >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 > >> ","product":"product2","bids":2,"ts":"1622644781247"} > >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 > >> ","product":"product2","bids":1,"ts":"1622644781248"} > >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 > >> ","product":"product3","bids":1,"ts":"1622644781248"} > >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > >> ","product":"product1","bids":3,"ts":"1622644782235"} > >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > >> ","product":"product2","bids":6,"ts":"1622644782236"} > >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > >> ","product":"product2","bids":4,"ts":"1622644782236"} > >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > >> ","product":"product2","bids":2,"ts":"1622644782237"} > >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > >> ","product":"product2","bids":1,"ts":"1622644782238"} > >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > >> ","product":"product3","bids":1,"ts":"1622644782239"} > >> ….. > >> ….. > >> > >> StreamExecutionEnvironment env = > >> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); > >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >> env.setParallelism(1); > >> Properties properties = new Properties(); > >> properties.setProperty("bootstrap.servers", "localhost:9092"); > >> properties.setProperty("group.id", "cep"); > >> DataStream<orders> stream = env.addSource( > >> new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), > >> properties)) > >> .map(json -> gson.fromJson(json, orders.class)) > >> .assignTimestampsAndWatermarks( > >> > >> WatermarkStrategy.<orders>forBoundedOutOfOrderness(Duration.ofSeconds(2)) > >> .withTimestampAssigner((orders, timestamp) -> > >> orders.ts) > >> ); Pattern<orders, ?> pattern = Pattern.<orders>begin( > >> "start", > >> AfterMatchSkipStrategy.skipPastLastEvent()).where(new > >> SimpleCondition<orders>() { > >> @Override > >> public boolean filter(orders value) throws Exception { > >> return value.product.equals("product1"); > >> } > >> }).times(1).followedBy("middle").where(new SimpleCondition<orders>() { > >> @Override > >> public boolean filter(orders value) throws Exception { > >> return value.product.equals("product2"); > >> } > >> }).oneOrMore().until(new SimpleCondition<orders>() { > >> @Override > >> public boolean filter(orders value) throws Exception { > >> return value.product.equals("product3"); > >> } > >> }).within(Time.seconds(10)); > >> PatternStream<orders> patternStream = > >> CEP.pattern(stream.keyBy((KeySelector<orders, String>) orders > >> -> orders.user_id), pattern); DataStream<String> alerts = > >> patternStream.select((PatternSelectFunction<orders, String>) matches -> > >> matches.get("start").get(0).user_id + "->" + > >> matches.get("middle").get(0).ts); > >> alerts.print(); > >> > >> > >> [cid:image001.png@01D7579C.775FCA00] > >> > >> I have also attached the checkpoint file. > >> > >> It looks like the NFA state keeps track of all keys seen and the start > >> state and that leads to increase in checkpoint size if the keys are not > >> reused in patterns. So, if I have fixed number of keys the size do not > >> increase. is this the expected behavior and correct understanding? > >> Is there a way to drop these keys once the pattern is matched.? or am I > >> missing something here? > >> > >> Thanks, > >> Mohit > >> > >