[ 
https://issues.apache.org/jira/browse/FLINK-36149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876747#comment-17876747
 ] 

luolei commented on FLINK-36149:
--------------------------------

Thank you for your suggestion. Yes, it is more reasonable for the parameter to 
take effect directly at the state backend level, as it allows not only SQL jobs 
but also datastream jobs to benefit from it.

> Support cleaning up expired states to prevent the continuous increase of 
> states and add RocksDB state cleanup configuration.
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36149
>                 URL: https://issues.apache.org/jira/browse/FLINK-36149
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: luolei
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: 1724512324453.jpg, 1724512362249.jpg
>
>
> 1、Problem description:
> {code:java}
> select *
> from 
> ( 
>     SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id, 
> FLOOR(proc_time TO day) order by proc_time asc ) as row_num 
>     from  tableA
>     where  cmd = 1 and user_id > 0
> ) 
> where row_num <=10  {code}
> Currently, the deduplication operator uses the Flink State TTL mechanism. The 
> default behavior of this mechanism is that expired states are only cleaned up 
> when they are accessed again. In our case, the key in the Flink state 
> includes the LOOR (proc_time TO day) timestamp. For example, if today is 
> December 28th, the new keys in the Flink state will include December 28th. 
> When it becomes December 29th, the keys for new records will include December 
> 29th, and the keys from December 28th will never be accessed again. Since 
> they are not accessed, they will not be cleaned up by the Flink State TTL 
> mechanism. As a result, the state in Flink will increase indefinitely.
>  
>  
> {code:java}
> 2021-02-25 06:49:25,593 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [null] failed with 
> java.net.ConnectException: Connection refused: 
> hadoop02.tcd.com/9.44.33.8:608992021-02-25 06:49:25,593 WARN  
> akka.remote.ReliableDeliverySupervisor                       [] - Association 
> with remote system [akka.tcp://fl...@hadoop02.tcd.com:60899] has failed, 
> address is now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://fl...@hadoop02.tcd.com:60899]] Caused by: 
> [java.net.ConnectException: Connection refused: 
> hadoop02.tcd.com/9.44.33.8:60899]2021-02-25 06:49:32,762 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Worker container_e26_1614150721877_0021_01_000004 is terminated. Diagnostics: 
> [2021-02-25 06:49:31.879]Container 
> [pid=24324,containerID=container_e26_1614150721877_0021_01_000004] is running 
> 103702528B beyond the 'PHYSICAL' memory limit. Current usage: 4.1 GB of 4 GB 
> physical memory used; 6.3 GB of 8.4 GB virtual memory used. Killing 
> container.Dump of the process-tree for 
> container_e26_1614150721877_0021_01_000004 :        |- PID PPID PGRPID SESSID 
> CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) 
> RSSMEM_USAGE(PAGES) FULL_CMD_LINE        |- 24551 24324 24324 24324 (java) 
> 1130639 94955 6799687680 1073522 /usr/java/jdk1.8.0_131/bin/java 
> -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 
> -XX:MaxMetaspaceSize=268435456 
> -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log
>  -Dlog4j.configuration=file:./log4j.properties 
> -Dlog4j.configurationFile=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner -D 
> taskmanager.memory.framework.off-heap.size=134217728b -D 
> taskmanager.memory.network.max=359703515b -D 
> taskmanager.memory.network.min=359703515b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
> taskmanager.memory.task.heap.size=1530082070b -D 
> taskmanager.memory.task.off-heap.size=0b --configDir . 
> -Djobmanager.rpc.address=hadoop02.tcd.com 
> -Djobmanager.memory.jvm-overhead.min=201326592b -Dpipeline.classpaths= 
> -Dtaskmanager.resource-id=container_e26_1614150721877_0021_01_000004 
> -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b 
> -Dexecution.target=embedded 
> -Dweb.tmpdir=/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8 
> -Dinternal.taskmanager.resource-id.metadata=hadoop03.tcd.com:8041 
> -Djobmanager.rpc.port=54474 
> -Dpipeline.jars=file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar
>  -Drest.address=hadoop02.tcd.com 
> -Djobmanager.memory.jvm-metaspace.size=268435456b 
> -Djobmanager.memory.heap.size=1073741824b 
> -Djobmanager.memory.jvm-overhead.max=201326592b        |- 24324 24315 24324 
> 24324 (bash) 1 0 11046912 372 /bin/bash -c /usr/java/jdk1.8.0_131/bin/java 
> -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 
> -XX:MaxMetaspaceSize=268435456 
> -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log
>  -Dlog4j.configuration=file:./log4j.properties 
> -Dlog4j.configurationFile=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner -D 
> taskmanager.memory.framework.off-heap.size=134217728b -D 
> taskmanager.memory.network.max=359703515b -D 
> taskmanager.memory.network.min=359703515b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
> taskmanager.memory.task.heap.size=1530082070b -D 
> taskmanager.memory.task.off-heap.size=0b --configDir . 
> -Djobmanager.rpc.address='hadoop02.tcd.com' 
> -Djobmanager.memory.jvm-overhead.min='201326592b' -Dpipeline.classpaths='' 
> -Dtaskmanager.resource-id='container_e26_1614150721877_0021_01_000004' 
> -Dweb.port='0' -Djobmanager.memory.off-heap.size='134217728b' 
> -Dexecution.target='embedded' 
> -Dweb.tmpdir='/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8' 
> -Dinternal.taskmanager.resource-id.metadata='hadoop03.tcd.com:8041' 
> -Djobmanager.rpc.port='54474' 
> -Dpipeline.jars='file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar'
>  -Drest.address='hadoop02.tcd.com' 
> -Djobmanager.memory.jvm-metaspace.size='268435456b' 
> -Djobmanager.memory.heap.size='1073741824b' 
> -Djobmanager.memory.jvm-overhead.max='201326592b' 1> 
> /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.out
>  2> 
> /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.err
> [2021-02-25 06:49:31.896]Container killed on request. Exit code is 
> 143[2021-02-25 06:49:31.908]Container exited with a non-zero exit code 143. 
> {code}
>  
> 2、Solution:
> 2.1  The Flink State TTL mechanism has added the {{cleanupFullSnapshot}} and 
> {{cleanupInRocksdbCompactFilter}} methods to clean up old states, even if 
> they have not been accessed.
>  * {{{}cleanupFullSnapshot{}}}: Removes expired states during a full 
> snapshot, thereby cleaning up old states.
>  * {{{}cleanupInRocksdbCompactFilter{}}}: Allows specifying the 
> {{queryTimeAfterNumEntries}} parameter. This parameter determines after how 
> many state entries the current timestamp should be updated. When RocksDB 
> performs compaction operations in the background, it uses the current 
> timestamp to determine whether a state is expired and filters out those 
> expired keys and values. If the {{queryTimeAfterNumEntries}} value is set 
> low, it will speed up the state cleanup process. However, since Flink calls 
> RocksDB code via JNI, frequent calls can incur significant overhead.
> 2.2  add RocksDB state cleanup configuration in Rank operators 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to