[ 
https://issues.apache.org/jira/browse/FLINK-37318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931381#comment-17931381
 ] 

Nil Madhab commented on FLINK-37318:
------------------------------------

I am new to flink and trying to contribute. Can you please share more details 
about the job? So I can reproduce the issue. 

> RocksDBKeyedStateBackend disposed before task exit
> --------------------------------------------------
>
>                 Key: FLINK-37318
>                 URL: https://issues.apache.org/jira/browse/FLINK-37318
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.12.2
>            Reporter: Feifan Wang
>            Priority: Major
>
> We encountered two cases in the same job where RocksDBStateBackend was 
> disposed before task cancel, as shown in the following logs:
> {code:java}
> 2024-12-01 12:12:12,703 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Closed 
> RocksDB State Backend. Cleaning up RocksDB working directory 
> /data4/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1725359226161_2973711/flink-io-a9a5e6ba-f582-40e0-a6f1-f4221911de28/job_34a8565e51c2c2e22bae59f537e5775f_op_KeyedProcessOperator_174cfe0875f20e2c391c93234c3d8810__65_80__uuid_01262a91-5f77-4f57-9cc1-edf586a5d799.
> 2024-12-01 12:12:21,866 WARN  org.apache.flink.runtime.taskmanager.Task       
>               - OverAggregate(partitionBy=[dpid, $18], orderBy=[stat_time 
> ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], sele...id, 
> imei, idfa, mac_address, client_ip, device_model, os_version, os, extension, 
> stat_time, w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) (65/80)#0 
> (3e1d0680c28b9bc1e58fc5eaf341e6a9) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> user key.
>         at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:486)
>         at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$1.next(RocksDBMapState.java:197)
>         at 
> org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:203)
>         at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
>         at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
>         at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
>         at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
>         at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
>         at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:219)
>         at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
>         at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:400)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:629)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:767)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:578)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
>         at 
> org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:233)
>         at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:72)
>         at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:30)
>         at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:380)
>         at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:64)
>         at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:483)
>         ... 20 more
> 2024-12-01 12:12:21,868 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Freeing task resources for OverAggregate(partitionBy=[dpid, 
> $18], orderBy=[stat_time ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND 
> CURRENT ROW], sele...id, imei, idfa, mac_address, client_ip, device_model, 
> os_version, os, extension, stat_time, w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) 
> (65/80)#0 (3e1d0680c28b9bc1e58fc5eaf341e6a9).
> 2024-12-01 12:12:21,895 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - 
> Un-registering task and sending final execution state FAILED to JobManager 
> for task OverAggregate(partitionBy=[dpid, $18], orderBy=[stat_time ASC], 
> window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], sele...id, imei, 
> idfa, mac_address, client_ip, device_model, os_version, os, extension, 
> stat_time, w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) (65/80)#0 
> 3e1d0680c28b9bc1e58fc5eaf341e6a9.
> 2024-12-01 12:12:21,964 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to cancel task 
> IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, 
> leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, 
> client_ip, device_model, os_version, os, extension, stat_time, session_id0, 
> event_timestamp0, page_identifier0]) (49/80)#0 
> (a533508520a4898f3658291d2eb7d509).
> 2024-12-01 12:12:21,964 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - IntervalJoin(joinType=[LeftOuterJoin], 
> windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, 
> leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, 
> extension, stat_time, session_id0, event_timestamp0, page_identifier0]) 
> (49/80)#0 (a533508520a4898f3658291d2eb7d509) switched from RUNNING to 
> CANCELING.
> 2024-12-01 12:12:21,964 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Triggering cancellation of task code 
> IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, 
> leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, 
> client_ip, device_model, os_version, os, extension, stat_time, session_id0, 
> event_timestamp0, page_identifier0]) (49/80)#0 
> (a533508520a4898f3658291d2eb7d509).
> 2024-12-01 12:12:21,971 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to cancel task 
> IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, 
> leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, 
> client_ip, device_model, os_version, os, extension, stat_time, session_id0, 
> event_timestamp0, page_identifier0]) (65/80)#0 
> (54c43203bab937f639cf9564425bface).
> 2024-12-01 12:12:21,971 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - IntervalJoin(joinType=[LeftOuterJoin], 
> windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, 
> leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, 
> extension, stat_time, session_id0, event_timestamp0, page_identifier0]) 
> (65/80)#0 (54c43203bab937f639cf9564425bface) switched from RUNNING to 
> CANCELING.
> 2024-12-01 12:12:21,971 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Triggering cancellation of task code 
> IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, 
> leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, 
> client_ip, device_model, os_version, os, extension, stat_time, session_id0, 
> event_timestamp0, page_identifier0]) (65/80)#0 
> (54c43203bab937f639cf9564425bface).
> 2024-12-01 12:12:21,972 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to cancel task 
> IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, 
> leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, 
> client_ip, device_model, os_version, os, extension, stat_time, session_id0, 
> event_timestamp0, page_identifier0]) (17/80)#0 
> (51c0a203e2dce0cceb2da1a1f992cd69).
> 2024-12-01 12:12:21,972 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - IntervalJoin(joinType=[LeftOuterJoin], 
> windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000, 
> leftTimeInde...a, mac_address, client_ip, device_model, os_version, os, 
> extension, stat_time, session_id0, event_timestamp0, page_identifier0]) 
> (17/80)#0 (51c0a203e2dce0cceb2da1a1f992cd69) switched from RUNNING to 
> CANCELING.
> 2024-12-01 12:12:21,972 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Triggering cancellation of task code 
> IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, 
> leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, 
> client_ip, device_model, os_version, os, extension, stat_time, session_id0, 
> event_timestamp0, page_identifier0]) (17/80)#0 
> (51c0a203e2dce0cceb2da1a1f992cd69).
> 2024-12-01 12:12:21,973 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to cancel task 
> IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, 
> leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address, 
> client_ip, device_model, os_version, os, extension, stat_time, session_id0, 
> event_timestamp0, page_identifier0]) (33/80)#0 
> (da848644a3859496193cbe46a3730bfd).
>  {code}
> Please note that there is no cancel task log before the close RocksDB message 
> at "2024-12-01 12:12:12,703" in the log. And then the deserialization 
> exception of RocksDBMapState appears 9 seconds later.
> This seems to be a bug. I first want to understand why 
> RocksDBKeyedStateBackend is disposed. Can anyone provide some help?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to