I am using Flink CEP to do some performance tests.

Flink version 1.13.1.

below is the sql:

INSERT INTO to_kafka
SELECT bizName, wdName, wdValue , zbValue , flowId FROM kafka_source
MATCH_RECOGNIZE
(
    PARTITION BY flow_id
    ORDER BY proctime
    MEASURES A.biz_name as bizName, A.wd_name as wdName, A.wd_value as wdValue, 
MAP[
        A.zb_name, A.zb_value, 
        B.zb_name, B.zb_value
    ] as zbValue, A.flow_id as flowId
    ONE ROW PER MATCH
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN ( A B ) WITHIN INTERVAL '10' SECOND
    DEFINE
        B AS B.flow_id = A.flow_id
);

I add the 'within clause' to avoid state growing, but it does not work at all.

the checkpoint size is growing fast.

I am using rocksdb incremental mode.


Reply via email to