[ https://issues.apache.org/jira/browse/FLINK-36149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-36149: ----------------------------------- Labels: pull-request-available (was: ) > Support cleaning up expired states to prevent the continuous increase of > states and add RocksDB state cleanup configuration. > ---------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-36149 > URL: https://issues.apache.org/jira/browse/FLINK-36149 > Project: Flink > Issue Type: Improvement > Reporter: luolei > Priority: Major > Labels: pull-request-available > Attachments: 1724512324453.jpg, 1724512362249.jpg > > > 1、Problem description: > {code:java} > select * > from > ( > SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id, > FLOOR(proc_time TO day) order by proc_time asc ) as row_num > from tableA > where cmd = 1 and user_id > 0 > ) > where row_num <=10 {code} > Currently, the deduplication operator uses the Flink State TTL mechanism. The > default behavior of this mechanism is that expired states are only cleaned up > when they are accessed again. In our case, the key in the Flink state > includes the LOOR (proc_time TO day) timestamp. For example, if today is > December 28th, the new keys in the Flink state will include December 28th. > When it becomes December 29th, the keys for new records will include December > 29th, and the keys from December 28th will never be accessed again. Since > they are not accessed, they will not be cleaned up by the Flink State TTL > mechanism. As a result, the state in Flink will increase indefinitely. > > > {code:java} > 2021-02-25 06:49:25,593 WARN akka.remote.transport.netty.NettyTransport > [] - Remote connection to [null] failed with > java.net.ConnectException: Connection refused: > hadoop02.tcd.com/9.44.33.8:608992021-02-25 06:49:25,593 WARN > akka.remote.ReliableDeliverySupervisor [] - Association > with remote system [akka.tcp://fl...@hadoop02.tcd.com:60899] has failed, > address is now gated for [50] ms. Reason: [Association failed with > [akka.tcp://fl...@hadoop02.tcd.com:60899]] Caused by: > [java.net.ConnectException: Connection refused: > hadoop02.tcd.com/9.44.33.8:60899]2021-02-25 06:49:32,762 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Worker container_e26_1614150721877_0021_01_000004 is terminated. Diagnostics: > [2021-02-25 06:49:31.879]Container > [pid=24324,containerID=container_e26_1614150721877_0021_01_000004] is running > 103702528B beyond the 'PHYSICAL' memory limit. Current usage: 4.1 GB of 4 GB > physical memory used; 6.3 GB of 8.4 GB virtual memory used. Killing > container.Dump of the process-tree for > container_e26_1614150721877_0021_01_000004 : |- PID PPID PGRPID SESSID > CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) > RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 24551 24324 24324 24324 (java) > 1130639 94955 6799687680 1073522 /usr/java/jdk1.8.0_131/bin/java > -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 > -XX:MaxMetaspaceSize=268435456 > -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log > -Dlog4j.configuration=file:./log4j.properties > -Dlog4j.configurationFile=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner -D > taskmanager.memory.framework.off-heap.size=134217728b -D > taskmanager.memory.network.max=359703515b -D > taskmanager.memory.network.min=359703515b -D > taskmanager.memory.framework.heap.size=134217728b -D > taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D > taskmanager.memory.task.heap.size=1530082070b -D > taskmanager.memory.task.off-heap.size=0b --configDir . > -Djobmanager.rpc.address=hadoop02.tcd.com > -Djobmanager.memory.jvm-overhead.min=201326592b -Dpipeline.classpaths= > -Dtaskmanager.resource-id=container_e26_1614150721877_0021_01_000004 > -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b > -Dexecution.target=embedded > -Dweb.tmpdir=/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8 > -Dinternal.taskmanager.resource-id.metadata=hadoop03.tcd.com:8041 > -Djobmanager.rpc.port=54474 > -Dpipeline.jars=file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar > -Drest.address=hadoop02.tcd.com > -Djobmanager.memory.jvm-metaspace.size=268435456b > -Djobmanager.memory.heap.size=1073741824b > -Djobmanager.memory.jvm-overhead.max=201326592b |- 24324 24315 24324 > 24324 (bash) 1 0 11046912 372 /bin/bash -c /usr/java/jdk1.8.0_131/bin/java > -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 > -XX:MaxMetaspaceSize=268435456 > -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log > -Dlog4j.configuration=file:./log4j.properties > -Dlog4j.configurationFile=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner -D > taskmanager.memory.framework.off-heap.size=134217728b -D > taskmanager.memory.network.max=359703515b -D > taskmanager.memory.network.min=359703515b -D > taskmanager.memory.framework.heap.size=134217728b -D > taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D > taskmanager.memory.task.heap.size=1530082070b -D > taskmanager.memory.task.off-heap.size=0b --configDir . > -Djobmanager.rpc.address='hadoop02.tcd.com' > -Djobmanager.memory.jvm-overhead.min='201326592b' -Dpipeline.classpaths='' > -Dtaskmanager.resource-id='container_e26_1614150721877_0021_01_000004' > -Dweb.port='0' -Djobmanager.memory.off-heap.size='134217728b' > -Dexecution.target='embedded' > -Dweb.tmpdir='/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8' > -Dinternal.taskmanager.resource-id.metadata='hadoop03.tcd.com:8041' > -Djobmanager.rpc.port='54474' > -Dpipeline.jars='file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar' > -Drest.address='hadoop02.tcd.com' > -Djobmanager.memory.jvm-metaspace.size='268435456b' > -Djobmanager.memory.heap.size='1073741824b' > -Djobmanager.memory.jvm-overhead.max='201326592b' 1> > /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.out > 2> > /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.err > [2021-02-25 06:49:31.896]Container killed on request. Exit code is > 143[2021-02-25 06:49:31.908]Container exited with a non-zero exit code 143. > {code} > > 2、Solution: > 2.1 The Flink State TTL mechanism has added the {{cleanupFullSnapshot}} and > {{cleanupInRocksdbCompactFilter}} methods to clean up old states, even if > they have not been accessed. > * {{{}cleanupFullSnapshot{}}}: Removes expired states during a full > snapshot, thereby cleaning up old states. > * {{{}cleanupInRocksdbCompactFilter{}}}: Allows specifying the > {{queryTimeAfterNumEntries}} parameter. This parameter determines after how > many state entries the current timestamp should be updated. When RocksDB > performs compaction operations in the background, it uses the current > timestamp to determine whether a state is expired and filters out those > expired keys and values. If the {{queryTimeAfterNumEntries}} value is set > low, it will speed up the state cleanup process. However, since Flink calls > RocksDB code via JNI, frequent calls can incur significant overhead. > 2.2 add RocksDB state cleanup configuration in Rank operators > > -- This message was sent by Atlassian Jira (v8.20.10#820010)