Hi,

Sorry for the late reply.

Indeed I found a couple of problems with clearing the state for short
lived keys. I created a JIRA[1] issue to track it and opened a PR (which
needs test coverage before it can be merged) with fixes for those.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-23314

On 06/07/2021 09:11, Li Jim wrote:
> Hi, Mohit, 
>
> Have you figured out any solusions on this problem ?
>
> I am now facing the exactly same problem ,
>
> I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but the 
> checkpoint size is still growing.
>
> On 2021/06/02 15:45:59, "Singh, Mohit" <sngh...@amazon.com> wrote: 
>> Hi,
>>
>> I am facing an issue with cep operator where checkpoint size keeps 
>> increasing even though the pattern is fully matched. I have a stream with 
>> unique user id and I want to detect a pattern of product purchased by user.
>>
>> here is the sample stream data
>>
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product1","bids":3,"ts":"1622644781243"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":6,"ts":"1622644781245"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":4,"ts":"1622644781247"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":2,"ts":"1622644781247"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":1,"ts":"1622644781248"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product3","bids":1,"ts":"1622644781248"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product1","bids":3,"ts":"1622644782235"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":6,"ts":"1622644782236"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":4,"ts":"1622644782236"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":2,"ts":"1622644782237"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":1,"ts":"1622644782238"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product3","bids":1,"ts":"1622644782239"}
>> …..
>> …..
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.setParallelism(1);
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "localhost:9092");
>> properties.setProperty("group.id", "cep");
>> DataStream<orders> stream = env.addSource(
>>             new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), 
>> properties))
>>             .map(json -> gson.fromJson(json, orders.class))
>>             .assignTimestampsAndWatermarks(
>>                     
>> WatermarkStrategy.<orders>forBoundedOutOfOrderness(Duration.ofSeconds(2))
>>                             .withTimestampAssigner((orders, timestamp) ->  
>> orders.ts)
>>             );    Pattern<orders, ?> pattern = Pattern.<orders>begin(
>>             "start",
>>             AfterMatchSkipStrategy.skipPastLastEvent()).where(new 
>> SimpleCondition<orders>() {
>>         @Override
>>         public boolean filter(orders value) throws Exception {
>>             return value.product.equals("product1");
>>         }
>>     }).times(1).followedBy("middle").where(new SimpleCondition<orders>() {
>>         @Override
>>         public boolean filter(orders value) throws Exception {
>>             return value.product.equals("product2");
>>         }
>>     }).oneOrMore().until(new SimpleCondition<orders>() {
>>         @Override
>>         public boolean filter(orders value) throws Exception {
>>             return value.product.equals("product3");
>>         }
>>     }).within(Time.seconds(10));
>> PatternStream<orders> patternStream =
>>             CEP.pattern(stream.keyBy((KeySelector<orders, String>) orders -> 
>> orders.user_id), pattern);    DataStream<String> alerts = 
>> patternStream.select((PatternSelectFunction<orders, String>) matches ->
>>             matches.get("start").get(0).user_id + "->" +
>>             matches.get("middle").get(0).ts);
>> alerts.print();
>>
>>
>> [cid:image001.png@01D7579C.775FCA00]
>>
>> I have also attached the checkpoint file.
>>
>> It looks like the NFA state keeps track of all keys seen and the start state 
>> and that leads to increase in checkpoint size if the keys are not reused in 
>> patterns. So, if I have fixed number of keys the size do not increase. is 
>> this the expected behavior and correct understanding?
>> Is there a way to drop these keys once the pattern is matched.? or am I 
>> missing something here?
>>
>> Thanks,
>> Mohit
>>

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to