[ https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406527#comment-17406527 ]
Yun Tang commented on FLINK-23721: ---------------------------------- [~lmagics] could you share the code example to reproduce this on local environment so that we could run it directly. > Flink SQL state TTL has no effect when using non-incremental > RocksDBStateBackend > -------------------------------------------------------------------------------- > > Key: FLINK-23721 > URL: https://issues.apache.org/jira/browse/FLINK-23721 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Table SQL / Runtime > Affects Versions: 1.13.0 > Reporter: Q Kang > Priority: Major > > Take the following deduplication SQL program as an example: > {code:java} > SET table.exec.state.ttl=30s; > INSERT INTO tmp.blackhole_order_done_log > SELECT t.* FROM ( > SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) > AS rn > FROM rtdw_ods.kafka_order_done_log > ) AS t WHERE rn = 1; > {code} > When using RocksDBStateBackend with incremental checkpoint enabled, the size > of deduplication state seems OK. > FlinkCompactionFilter is also working, regarding to logs below: > {code:java} > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3481026D01, Value > type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Last access timestamp: > 1628673475181 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Decision: 1 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3484064901, Value > type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Last access timestamp: > 1628673672777 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Decision: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3483341D01, Value > type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Last access timestamp: > 1628673618973 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Decision: 1 > {code} > However, after turning off incremental checkpoint, the state TTL seems not > effective at all: FlinkCompactionFilter logs are not printed, and the size of > deduplication state grows steadily up to several GBs (Kafka traffic is > somewhat heavy, at about 1K records per sec). > In contrast, FsStateBackend always works well. > -- This message was sent by Atlassian Jira (v8.3.4#803005)