I am using Flink CEP to do some performance tests. Flink version 1.13.1.
below is the sql: INSERT INTO to_kafka SELECT bizName, wdName, wdValue , zbValue , flowId FROM kafka_source MATCH_RECOGNIZE ( PARTITION BY flow_id ORDER BY proctime MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as wdValue, MAP[ A.zb_name, A.zb_value, B.zb_name, B.zb_value ] as zbValue, A.flow_id as flowId ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN ( A B ) WITHIN INTERVAL '10' SECOND DEFINE B AS B.flow_id = A.flow_id ); I add the 'within clause' to avoid state growing, but it does not work at all. the checkpoint size is growing fast. I am using rocksdb incremental mode.