Feifan Wang created FLINK-37318: ----------------------------------- Summary: RocksDBKeyedStateBackend disposed before task exit Key: FLINK-37318 URL: https://issues.apache.org/jira/browse/FLINK-37318 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.12.2 Reporter: Feifan Wang
We encountered two cases in the same job where RocksDBStateBackend was disposed before task cancel, as shown in the following logs: {code:java} 2024-12-01 12:12:12,703 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Closed RocksDB State Backend. Cleaning up RocksDB working directory /data4/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1725359226161_2973711/flink-io-a9a5e6ba-f582-40e0-a6f1-f4221911de28/job_34a8565e51c2c2e22bae59f537e5775f_op_KeyedProcessOperator_174cfe0875f20e2c391c93234c3d8810__65_80__uuid_01262a91-5f77-4f57-9cc1-edf586a5d799. 2024-12-01 12:12:21,866 WARN org.apache.flink.runtime.taskmanager.Task - OverAggregate(partitionBy=[dpid, $18], orderBy=[stat_time ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], sele...id, imei, idfa, mac_address, client_ip, device_model, os_version, os, extension, stat_time, w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) (65/80)#0 (3e1d0680c28b9bc1e58fc5eaf341e6a9) switched from RUNNING to FAILED. org.apache.flink.util.FlinkRuntimeException: Error while deserializing the user key. at org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:486) at org.apache.flink.contrib.streaming.state.RocksDBMapState$1.next(RocksDBMapState.java:197) at org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:203) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:219) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:400) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:629) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:767) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:578) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:233) at org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:72) at org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:30) at org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:380) at org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:64) at org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:483) ... 20 more 2024-12-01 12:12:21,868 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for OverAggregate(partitionBy=[dpid, $18], orderBy=[stat_time ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], sele...id, imei, idfa, mac_address, client_ip, device_model, os_version, os, extension, stat_time, w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) (65/80)#0 (3e1d0680c28b9bc1e58fc5eaf341e6a9). 2024-12-01 12:12:21,895 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task OverAggregate(partitionBy=[dpid, $18], orderBy=[stat_time ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], sele...id, imei, idfa, mac_address, client_ip, device_model, os_version, os, extension, stat_time, w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) (65/80)#0 3e1d0680c28b9bc1e58fc5eaf341e6a9. 2024-12-01 12:12:21,964 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, extension, stat_time, session_id0, event_timestamp0, page_identifier0]) (49/80)#0 (a533508520a4898f3658291d2eb7d509). 2024-12-01 12:12:21,964 INFO org.apache.flink.runtime.taskmanager.Task - IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, extension, stat_time, session_id0, event_timestamp0, page_identifier0]) (49/80)#0 (a533508520a4898f3658291d2eb7d509) switched from RUNNING to CANCELING. 2024-12-01 12:12:21,964 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, extension, stat_time, session_id0, event_timestamp0, page_identifier0]) (49/80)#0 (a533508520a4898f3658291d2eb7d509). 2024-12-01 12:12:21,971 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, extension, stat_time, session_id0, event_timestamp0, page_identifier0]) (65/80)#0 (54c43203bab937f639cf9564425bface). 2024-12-01 12:12:21,971 INFO org.apache.flink.runtime.taskmanager.Task - IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, extension, stat_time, session_id0, event_timestamp0, page_identifier0]) (65/80)#0 (54c43203bab937f639cf9564425bface) switched from RUNNING to CANCELING. 2024-12-01 12:12:21,971 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, extension, stat_time, session_id0, event_timestamp0, page_identifier0]) (65/80)#0 (54c43203bab937f639cf9564425bface). 2024-12-01 12:12:21,972 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, extension, stat_time, session_id0, event_timestamp0, page_identifier0]) (17/80)#0 (51c0a203e2dce0cceb2da1a1f992cd69). 2024-12-01 12:12:21,972 INFO org.apache.flink.runtime.taskmanager.Task - IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, extension, stat_time, session_id0, event_timestamp0, page_identifier0]) (17/80)#0 (51c0a203e2dce0cceb2da1a1f992cd69) switched from RUNNING to CANCELING. 2024-12-01 12:12:21,972 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, extension, stat_time, session_id0, event_timestamp0, page_identifier0]) (17/80)#0 (51c0a203e2dce0cceb2da1a1f992cd69). 2024-12-01 12:12:21,973 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, extension, stat_time, session_id0, event_timestamp0, page_identifier0]) (33/80)#0 (da848644a3859496193cbe46a3730bfd). {code} Please note that there is no cancel task log before the close RocksDB message at "2024-12-01 12:12:12,703" in the log. And then the deserialization exception of RocksDBMapState appears 9 seconds later. This seems to be a bug. I first want to understand why RocksDBKeyedStateBackend is disposed. Can anyone provide some help? -- This message was sent by Atlassian Jira (v8.20.10#820010)