Hi, Sorry for the late reply.
Indeed I found a couple of problems with clearing the state for short lived keys. I created a JIRA[1] issue to track it and opened a PR (which needs test coverage before it can be merged) with fixes for those. Best, Dawid [1] https://issues.apache.org/jira/browse/FLINK-23314 On 06/07/2021 09:11, Li Jim wrote: > Hi, Mohit, > > Have you figured out any solusions on this problem ? > > I am now facing the exactly same problem , > > I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but the > checkpoint size is still growing. > > On 2021/06/02 15:45:59, "Singh, Mohit" <sngh...@amazon.com> wrote: >> Hi, >> >> I am facing an issue with cep operator where checkpoint size keeps >> increasing even though the pattern is fully matched. I have a stream with >> unique user id and I want to detect a pattern of product purchased by user. >> >> here is the sample stream data >> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 >> ","product":"product1","bids":3,"ts":"1622644781243"} >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 >> ","product":"product2","bids":6,"ts":"1622644781245"} >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 >> ","product":"product2","bids":4,"ts":"1622644781247"} >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 >> ","product":"product2","bids":2,"ts":"1622644781247"} >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 >> ","product":"product2","bids":1,"ts":"1622644781248"} >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 >> ","product":"product3","bids":1,"ts":"1622644781248"} >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b >> ","product":"product1","bids":3,"ts":"1622644782235"} >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b >> ","product":"product2","bids":6,"ts":"1622644782236"} >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b >> ","product":"product2","bids":4,"ts":"1622644782236"} >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b >> ","product":"product2","bids":2,"ts":"1622644782237"} >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b >> ","product":"product2","bids":1,"ts":"1622644782238"} >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b >> ","product":"product3","bids":1,"ts":"1622644782239"} >> ….. >> ….. >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> env.setParallelism(1); >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("group.id", "cep"); >> DataStream<orders> stream = env.addSource( >> new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), >> properties)) >> .map(json -> gson.fromJson(json, orders.class)) >> .assignTimestampsAndWatermarks( >> >> WatermarkStrategy.<orders>forBoundedOutOfOrderness(Duration.ofSeconds(2)) >> .withTimestampAssigner((orders, timestamp) -> >> orders.ts) >> ); Pattern<orders, ?> pattern = Pattern.<orders>begin( >> "start", >> AfterMatchSkipStrategy.skipPastLastEvent()).where(new >> SimpleCondition<orders>() { >> @Override >> public boolean filter(orders value) throws Exception { >> return value.product.equals("product1"); >> } >> }).times(1).followedBy("middle").where(new SimpleCondition<orders>() { >> @Override >> public boolean filter(orders value) throws Exception { >> return value.product.equals("product2"); >> } >> }).oneOrMore().until(new SimpleCondition<orders>() { >> @Override >> public boolean filter(orders value) throws Exception { >> return value.product.equals("product3"); >> } >> }).within(Time.seconds(10)); >> PatternStream<orders> patternStream = >> CEP.pattern(stream.keyBy((KeySelector<orders, String>) orders -> >> orders.user_id), pattern); DataStream<String> alerts = >> patternStream.select((PatternSelectFunction<orders, String>) matches -> >> matches.get("start").get(0).user_id + "->" + >> matches.get("middle").get(0).ts); >> alerts.print(); >> >> >> [cid:image001.png@01D7579C.775FCA00] >> >> I have also attached the checkpoint file. >> >> It looks like the NFA state keeps track of all keys seen and the start state >> and that leads to increase in checkpoint size if the keys are not reused in >> patterns. So, if I have fixed number of keys the size do not increase. is >> this the expected behavior and correct understanding? >> Is there a way to drop these keys once the pattern is matched.? or am I >> missing something here? >> >> Thanks, >> Mohit >>
OpenPGP_signature
Description: OpenPGP digital signature