[ 
https://issues.apache.org/jira/browse/FLINK-36149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36149:
-----------------------------------
    Labels: pull-request-available  (was: )

> Support cleaning up expired states to prevent the continuous increase of 
> states and add RocksDB state cleanup configuration.
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36149
>                 URL: https://issues.apache.org/jira/browse/FLINK-36149
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: luolei
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: 1724512324453.jpg, 1724512362249.jpg
>
>
> 1、Problem description:
> {code:java}
> select *
> from 
> ( 
>     SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id, 
> FLOOR(proc_time TO day) order by proc_time asc ) as row_num 
>     from  tableA
>     where  cmd = 1 and user_id > 0
> ) 
> where row_num <=10  {code}
> Currently, the deduplication operator uses the Flink State TTL mechanism. The 
> default behavior of this mechanism is that expired states are only cleaned up 
> when they are accessed again. In our case, the key in the Flink state 
> includes the LOOR (proc_time TO day) timestamp. For example, if today is 
> December 28th, the new keys in the Flink state will include December 28th. 
> When it becomes December 29th, the keys for new records will include December 
> 29th, and the keys from December 28th will never be accessed again. Since 
> they are not accessed, they will not be cleaned up by the Flink State TTL 
> mechanism. As a result, the state in Flink will increase indefinitely.
>  
>  
> {code:java}
> 2021-02-25 06:49:25,593 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [null] failed with 
> java.net.ConnectException: Connection refused: 
> hadoop02.tcd.com/9.44.33.8:608992021-02-25 06:49:25,593 WARN  
> akka.remote.ReliableDeliverySupervisor                       [] - Association 
> with remote system [akka.tcp://fl...@hadoop02.tcd.com:60899] has failed, 
> address is now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://fl...@hadoop02.tcd.com:60899]] Caused by: 
> [java.net.ConnectException: Connection refused: 
> hadoop02.tcd.com/9.44.33.8:60899]2021-02-25 06:49:32,762 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Worker container_e26_1614150721877_0021_01_000004 is terminated. Diagnostics: 
> [2021-02-25 06:49:31.879]Container 
> [pid=24324,containerID=container_e26_1614150721877_0021_01_000004] is running 
> 103702528B beyond the 'PHYSICAL' memory limit. Current usage: 4.1 GB of 4 GB 
> physical memory used; 6.3 GB of 8.4 GB virtual memory used. Killing 
> container.Dump of the process-tree for 
> container_e26_1614150721877_0021_01_000004 :        |- PID PPID PGRPID SESSID 
> CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) 
> RSSMEM_USAGE(PAGES) FULL_CMD_LINE        |- 24551 24324 24324 24324 (java) 
> 1130639 94955 6799687680 1073522 /usr/java/jdk1.8.0_131/bin/java 
> -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 
> -XX:MaxMetaspaceSize=268435456 
> -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log
>  -Dlog4j.configuration=file:./log4j.properties 
> -Dlog4j.configurationFile=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner -D 
> taskmanager.memory.framework.off-heap.size=134217728b -D 
> taskmanager.memory.network.max=359703515b -D 
> taskmanager.memory.network.min=359703515b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
> taskmanager.memory.task.heap.size=1530082070b -D 
> taskmanager.memory.task.off-heap.size=0b --configDir . 
> -Djobmanager.rpc.address=hadoop02.tcd.com 
> -Djobmanager.memory.jvm-overhead.min=201326592b -Dpipeline.classpaths= 
> -Dtaskmanager.resource-id=container_e26_1614150721877_0021_01_000004 
> -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b 
> -Dexecution.target=embedded 
> -Dweb.tmpdir=/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8 
> -Dinternal.taskmanager.resource-id.metadata=hadoop03.tcd.com:8041 
> -Djobmanager.rpc.port=54474 
> -Dpipeline.jars=file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar
>  -Drest.address=hadoop02.tcd.com 
> -Djobmanager.memory.jvm-metaspace.size=268435456b 
> -Djobmanager.memory.heap.size=1073741824b 
> -Djobmanager.memory.jvm-overhead.max=201326592b        |- 24324 24315 24324 
> 24324 (bash) 1 0 11046912 372 /bin/bash -c /usr/java/jdk1.8.0_131/bin/java 
> -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 
> -XX:MaxMetaspaceSize=268435456 
> -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log
>  -Dlog4j.configuration=file:./log4j.properties 
> -Dlog4j.configurationFile=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner -D 
> taskmanager.memory.framework.off-heap.size=134217728b -D 
> taskmanager.memory.network.max=359703515b -D 
> taskmanager.memory.network.min=359703515b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
> taskmanager.memory.task.heap.size=1530082070b -D 
> taskmanager.memory.task.off-heap.size=0b --configDir . 
> -Djobmanager.rpc.address='hadoop02.tcd.com' 
> -Djobmanager.memory.jvm-overhead.min='201326592b' -Dpipeline.classpaths='' 
> -Dtaskmanager.resource-id='container_e26_1614150721877_0021_01_000004' 
> -Dweb.port='0' -Djobmanager.memory.off-heap.size='134217728b' 
> -Dexecution.target='embedded' 
> -Dweb.tmpdir='/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8' 
> -Dinternal.taskmanager.resource-id.metadata='hadoop03.tcd.com:8041' 
> -Djobmanager.rpc.port='54474' 
> -Dpipeline.jars='file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar'
>  -Drest.address='hadoop02.tcd.com' 
> -Djobmanager.memory.jvm-metaspace.size='268435456b' 
> -Djobmanager.memory.heap.size='1073741824b' 
> -Djobmanager.memory.jvm-overhead.max='201326592b' 1> 
> /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.out
>  2> 
> /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.err
> [2021-02-25 06:49:31.896]Container killed on request. Exit code is 
> 143[2021-02-25 06:49:31.908]Container exited with a non-zero exit code 143. 
> {code}
>  
> 2、Solution:
> 2.1  The Flink State TTL mechanism has added the {{cleanupFullSnapshot}} and 
> {{cleanupInRocksdbCompactFilter}} methods to clean up old states, even if 
> they have not been accessed.
>  * {{{}cleanupFullSnapshot{}}}: Removes expired states during a full 
> snapshot, thereby cleaning up old states.
>  * {{{}cleanupInRocksdbCompactFilter{}}}: Allows specifying the 
> {{queryTimeAfterNumEntries}} parameter. This parameter determines after how 
> many state entries the current timestamp should be updated. When RocksDB 
> performs compaction operations in the background, it uses the current 
> timestamp to determine whether a state is expired and filters out those 
> expired keys and values. If the {{queryTimeAfterNumEntries}} value is set 
> low, it will speed up the state cleanup process. However, since Flink calls 
> RocksDB code via JNI, frequent calls can incur significant overhead.
> 2.2  add RocksDB state cleanup configuration in Rank operators 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to