Feifan Wang created FLINK-37318:
-----------------------------------

             Summary: RocksDBKeyedStateBackend disposed before task exit
                 Key: FLINK-37318
                 URL: https://issues.apache.org/jira/browse/FLINK-37318
             Project: Flink
          Issue Type: Bug
          Components: Runtime / State Backends
    Affects Versions: 1.12.2
            Reporter: Feifan Wang


We encountered two cases in the same job where RocksDBStateBackend was disposed 
before task cancel, as shown in the following logs:
{code:java}
2024-12-01 12:12:12,703 INFO  
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Closed 
RocksDB State Backend. Cleaning up RocksDB working directory 
/data4/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1725359226161_2973711/flink-io-a9a5e6ba-f582-40e0-a6f1-f4221911de28/job_34a8565e51c2c2e22bae59f537e5775f_op_KeyedProcessOperator_174cfe0875f20e2c391c93234c3d8810__65_80__uuid_01262a91-5f77-4f57-9cc1-edf586a5d799.
2024-12-01 12:12:21,866 WARN  org.apache.flink.runtime.taskmanager.Task         
            - OverAggregate(partitionBy=[dpid, $18], orderBy=[stat_time ASC], 
window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], sele...id, imei, 
idfa, mac_address, client_ip, device_model, os_version, os, extension, 
stat_time, w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) (65/80)#0 
(3e1d0680c28b9bc1e58fc5eaf341e6a9) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Error while deserializing the user 
key.
        at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:486)
        at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$1.next(RocksDBMapState.java:197)
        at 
org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:203)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
        at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:219)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:400)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:629)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:767)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:578)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
        at 
org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:233)
        at 
org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:72)
        at 
org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:30)
        at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:380)
        at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:64)
        at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:483)
        ... 20 more
2024-12-01 12:12:21,868 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Freeing task resources for OverAggregate(partitionBy=[dpid, $18], 
orderBy=[stat_time ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], sele...id, imei, idfa, mac_address, client_ip, device_model, os_version, 
os, extension, stat_time, w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) (65/80)#0 
(3e1d0680c28b9bc1e58fc5eaf341e6a9).
2024-12-01 12:12:21,895 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering 
task and sending final execution state FAILED to JobManager for task 
OverAggregate(partitionBy=[dpid, $18], orderBy=[stat_time ASC], window=[ ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], sele...id, imei, idfa, 
mac_address, client_ip, device_model, os_version, os, extension, stat_time, 
w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) (65/80)#0 3e1d0680c28b9bc1e58fc5eaf341e6a9.
2024-12-01 12:12:21,964 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Attempting to cancel task IntervalJoin(joinType=[LeftOuterJoin], 
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, 
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, 
extension, stat_time, session_id0, event_timestamp0, page_identifier0]) 
(49/80)#0 (a533508520a4898f3658291d2eb7d509).
2024-12-01 12:12:21,964 INFO  org.apache.flink.runtime.taskmanager.Task         
            - IntervalJoin(joinType=[LeftOuterJoin], 
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, 
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, 
extension, stat_time, session_id0, event_timestamp0, page_identifier0]) 
(49/80)#0 (a533508520a4898f3658291d2eb7d509) switched from RUNNING to CANCELING.
2024-12-01 12:12:21,964 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Triggering cancellation of task code 
IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, 
client_ip, device_model, os_version, os, extension, stat_time, session_id0, 
event_timestamp0, page_identifier0]) (49/80)#0 
(a533508520a4898f3658291d2eb7d509).
2024-12-01 12:12:21,971 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Attempting to cancel task IntervalJoin(joinType=[LeftOuterJoin], 
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, 
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, 
extension, stat_time, session_id0, event_timestamp0, page_identifier0]) 
(65/80)#0 (54c43203bab937f639cf9564425bface).
2024-12-01 12:12:21,971 INFO  org.apache.flink.runtime.taskmanager.Task         
            - IntervalJoin(joinType=[LeftOuterJoin], 
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, 
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, 
extension, stat_time, session_id0, event_timestamp0, page_identifier0]) 
(65/80)#0 (54c43203bab937f639cf9564425bface) switched from RUNNING to CANCELING.
2024-12-01 12:12:21,971 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Triggering cancellation of task code 
IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, 
client_ip, device_model, os_version, os, extension, stat_time, session_id0, 
event_timestamp0, page_identifier0]) (65/80)#0 
(54c43203bab937f639cf9564425bface).
2024-12-01 12:12:21,972 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Attempting to cancel task IntervalJoin(joinType=[LeftOuterJoin], 
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, 
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, 
extension, stat_time, session_id0, event_timestamp0, page_identifier0]) 
(17/80)#0 (51c0a203e2dce0cceb2da1a1f992cd69).
2024-12-01 12:12:21,972 INFO  org.apache.flink.runtime.taskmanager.Task         
            - IntervalJoin(joinType=[LeftOuterJoin], 
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, 
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, 
extension, stat_time, session_id0, event_timestamp0, page_identifier0]) 
(17/80)#0 (51c0a203e2dce0cceb2da1a1f992cd69) switched from RUNNING to CANCELING.
2024-12-01 12:12:21,972 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Triggering cancellation of task code 
IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, 
client_ip, device_model, os_version, os, extension, stat_time, session_id0, 
event_timestamp0, page_identifier0]) (17/80)#0 
(51c0a203e2dce0cceb2da1a1f992cd69).
2024-12-01 12:12:21,973 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Attempting to cancel task IntervalJoin(joinType=[LeftOuterJoin], 
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, 
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, 
extension, stat_time, session_id0, event_timestamp0, page_identifier0]) 
(33/80)#0 (da848644a3859496193cbe46a3730bfd).
 {code}
Please note that there is no cancel task log before the close RocksDB message 
at "2024-12-01 12:12:12,703" in the log. And then the deserialization exception 
of RocksDBMapState appears 9 seconds later.

This seems to be a bug. I first want to understand why RocksDBKeyedStateBackend 
is disposed. Can anyone provide some help?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to