Hi, Mohit, Have you figured out any solusions on this problem ?
I am now facing the exactly same problem , I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but the checkpoint size is still growing. On 2021/06/02 15:45:59, "Singh, Mohit" <sngh...@amazon.com> wrote: > Hi, > > I am facing an issue with cep operator where checkpoint size keeps increasing > even though the pattern is fully matched. I have a stream with unique user id > and I want to detect a pattern of product purchased by user. > > here is the sample stream data > > {"user_id":"45eff814-9016-4849-b607-391601e00e97 > ","product":"product1","bids":3,"ts":"1622644781243"} > {"user_id":"45eff814-9016-4849-b607-391601e00e97 > ","product":"product2","bids":6,"ts":"1622644781245"} > {"user_id":"45eff814-9016-4849-b607-391601e00e97 > ","product":"product2","bids":4,"ts":"1622644781247"} > {"user_id":"45eff814-9016-4849-b607-391601e00e97 > ","product":"product2","bids":2,"ts":"1622644781247"} > {"user_id":"45eff814-9016-4849-b607-391601e00e97 > ","product":"product2","bids":1,"ts":"1622644781248"} > {"user_id":"45eff814-9016-4849-b607-391601e00e97 > ","product":"product3","bids":1,"ts":"1622644781248"} > {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > ","product":"product1","bids":3,"ts":"1622644782235"} > {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > ","product":"product2","bids":6,"ts":"1622644782236"} > {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > ","product":"product2","bids":4,"ts":"1622644782236"} > {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > ","product":"product2","bids":2,"ts":"1622644782237"} > {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > ","product":"product2","bids":1,"ts":"1622644782238"} > {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b > ","product":"product3","bids":1,"ts":"1622644782239"} > ….. > ….. > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.setParallelism(1); > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id", "cep"); > DataStream<orders> stream = env.addSource( > new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), > properties)) > .map(json -> gson.fromJson(json, orders.class)) > .assignTimestampsAndWatermarks( > > WatermarkStrategy.<orders>forBoundedOutOfOrderness(Duration.ofSeconds(2)) > .withTimestampAssigner((orders, timestamp) -> > orders.ts) > ); Pattern<orders, ?> pattern = Pattern.<orders>begin( > "start", > AfterMatchSkipStrategy.skipPastLastEvent()).where(new > SimpleCondition<orders>() { > @Override > public boolean filter(orders value) throws Exception { > return value.product.equals("product1"); > } > }).times(1).followedBy("middle").where(new SimpleCondition<orders>() { > @Override > public boolean filter(orders value) throws Exception { > return value.product.equals("product2"); > } > }).oneOrMore().until(new SimpleCondition<orders>() { > @Override > public boolean filter(orders value) throws Exception { > return value.product.equals("product3"); > } > }).within(Time.seconds(10)); > PatternStream<orders> patternStream = > CEP.pattern(stream.keyBy((KeySelector<orders, String>) orders -> > orders.user_id), pattern); DataStream<String> alerts = > patternStream.select((PatternSelectFunction<orders, String>) matches -> > matches.get("start").get(0).user_id + "->" + > matches.get("middle").get(0).ts); > alerts.print(); > > > [cid:image001.png@01D7579C.775FCA00] > > I have also attached the checkpoint file. > > It looks like the NFA state keeps track of all keys seen and the start state > and that leads to increase in checkpoint size if the keys are not reused in > patterns. So, if I have fixed number of keys the size do not increase. is > this the expected behavior and correct understanding? > Is there a way to drop these keys once the pattern is matched.? or am I > missing something here? > > Thanks, > Mohit >