[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924691#comment-17924691
 ] 

Tommy Schnabel commented on FLINK-23886:
----------------------------------------

I don't know if it helps, but we've ran into this issue a few times. Each time 
was the first restart after exceeding ~30TB of state. We've reliably been able 
to predict when our new clusters run into this issue during the cluster's first 
restart at such large state size. After the first restart where we run into 
this corrupted timer state, we do not run into this issue

> An exception is thrown out when recover job timers from checkpoint file
> -----------------------------------------------------------------------
>
>                 Key: FLINK-23886
>                 URL: https://issues.apache.org/jira/browse/FLINK-23886
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.10.0, 1.11.3, 1.13.2, 1.13.5, 1.14.4
>            Reporter: Jing Zhang
>            Assignee: Yuan Mei
>            Priority: Major
>         Attachments: image-2021-08-25-16-38-04-023.png, 
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, 
> image-2021-08-25-17-07-38-327.png, image-2024-07-08-16-55-53-243.png, 
> segment-drop-corrupted-timer-state.diff
>
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.<init>(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
> at 
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
> at 
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)
> ... 20 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to