[ 
https://issues.apache.org/jira/browse/FLINK-22806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YUJIANBO updated FLINK-22806:
-----------------------------
    Description: 
I have added the parameter :
   1、tableEnv.getConfig().setIdleStateRetention:   3600 (one hour),  
   2、state.checkpoints.num-retained:3
   3、sql:
{code:sql}
// demo:  
    select count(1),  LISTAGG(concat(m,n)) from tabeA group by a, b, time_minute

//details:
CREATE TABLE user_behavior (
   `request_ip` STRING,
   `request_time` BIGINT,
   `header` STRING ,
   // timestamp  converted to specific per minute   
   `t_min` as cast(`request_time`-(`request_time` + 28800000)%60000 as BIGINT),
   `ts` as TO_TIMESTAMP(FROM_UNIXTIME(`request_time`/1000-28800,'yyyy-MM-dd 
HH:mm:ss')),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '60' MINUTE) 
with (
   'connector' = 'kafka',
   ........ 
);


CREATE TABLE blackhole_table (
   `cnt` BIGINT,
   `lists` STRING
) WITH (
 'connector' = 'blackhole'
);


insert into blackhole_table 
select 
    count(*) as cnt, 
    LISTAGG(concat(`request_ip`, `header`, cast(`request_time` as STRING))) as 
lists
from user_behavior 
group by `request_ip`,`header`,`t_min`;
{code}

   4、state.backend: rocksdb  
   5、state.backend.incremental is true

I set the checkpoint state for one hour, but the size of the folder directory 
/checkpoint/shared is still growing.  I observed it for two days and guessed 
that there was expired data in the  /checkpoint/shared folder that had not been 
cleared?

What else can limit the growth of state?

  was:
I have added the parameter :
   1、tableEnv.getConfig().setIdleStateRetention:   3600 (one hour),  
   2、state.checkpoints.num-retained:3
   3、sql:
{code:sql}
// demo:  
    select count(1),  LISTAGG(concat(m,n)) from tabeA group by a, b, time_minute

///details:
    CREATE TABLE user_behavior (
   `request_ip` STRING,
   `request_time` BIGINT,
   `header` STRING ,
   `t_min` as cast(`request_time`-(`request_time` + 28800000)%60000 as BIGINT),
   `ts` as TO_TIMESTAMP(FROM_UNIXTIME(`request_time`/1000-28800,'yyyy-MM-dd 
HH:mm:ss')),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '60' MINUTE) 
with (
   'connector' = 'kafka',
   ........ 
);


CREATE TABLE blackhole_table (
   `cnt` BIGINT,
   `lists` STRING
) WITH (
 'connector' = 'blackhole'
);


insert into blackhole_table 
select 
    count(*) as cnt, 
    LISTAGG(concat(`request_ip`, `header`, cast(`request_time` as STRING))) as 
lists
from user_behavior 
group by `request_ip`,`header`,`t_min`;
{code}

   4、state.backend: rocksdb  
   5、state.backend.incremental is true

I set the checkpoint state for one hour, but the size of the folder directory 
/checkpoint/shared is still growing.  I observed it for two days and guessed 
that there was expired data in the  /checkpoint/shared folder that had not been 
cleared?

What else can limit the growth of state?


> The folder /checkpoint/shared state of  FLINKSQL is getting bigger and bigger
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-22806
>                 URL: https://issues.apache.org/jira/browse/FLINK-22806
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.12.0
>            Reporter: YUJIANBO
>            Priority: Major
>
> I have added the parameter :
>    1、tableEnv.getConfig().setIdleStateRetention:   3600 (one hour),  
>    2、state.checkpoints.num-retained:3
>    3、sql:
> {code:sql}
> // demo:  
>     select count(1),  LISTAGG(concat(m,n)) from tabeA group by a, b, 
> time_minute
> //details:
> CREATE TABLE user_behavior (
>    `request_ip` STRING,
>    `request_time` BIGINT,
>    `header` STRING ,
>    // timestamp  converted to specific per minute   
>    `t_min` as cast(`request_time`-(`request_time` + 28800000)%60000 as 
> BIGINT),
>    `ts` as TO_TIMESTAMP(FROM_UNIXTIME(`request_time`/1000-28800,'yyyy-MM-dd 
> HH:mm:ss')),
>    WATERMARK FOR `ts` AS `ts` - INTERVAL '60' MINUTE) 
> with (
>    'connector' = 'kafka',
>    ........ 
> );
> CREATE TABLE blackhole_table (
>    `cnt` BIGINT,
>    `lists` STRING
> ) WITH (
>  'connector' = 'blackhole'
> );
> insert into blackhole_table 
> select 
>     count(*) as cnt, 
>     LISTAGG(concat(`request_ip`, `header`, cast(`request_time` as STRING))) 
> as lists
> from user_behavior 
> group by `request_ip`,`header`,`t_min`;
> {code}
>    4、state.backend: rocksdb  
>    5、state.backend.incremental is true
> I set the checkpoint state for one hour, but the size of the folder directory 
> /checkpoint/shared is still growing.  I observed it for two days and guessed 
> that there was expired data in the  /checkpoint/shared folder that had not 
> been cleared?
> What else can limit the growth of state?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to