[ https://issues.apache.org/jira/browse/FLINK-20654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17251864#comment-17251864 ]
Arvid Heise commented on FLINK-20654: ------------------------------------- For my investigation, I added a bunch of info statements to track which buffers are written. https://github.com/AHeise/flink/tree/FLINK-20654 For [Parallel union, p = 5], I noticed that the issues arises only when multiple buffers of the same channel are recovered. {noformat} 19749 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 8 bytes 19749 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=1, inputChannelIdx=3} recovered 9 bytes 19750 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=2, inputChannelIdx=3} recovered 1 bytes 19750 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=3, inputChannelIdx=3} recovered 14 bytes 19750 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes 19750 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes 19750 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes 19750 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes 19750 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes 19750 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes 19750 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes 19750 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes 19750 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes 19750 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4 bytes 19750 [Flat Map (4/5)#5] INFO org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=1, inputChannelIdx=3} prepareSnapshot 9 bytes 19750 [Flat Map (4/5)#5] INFO org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=2, inputChannelIdx=3} prepareSnapshot 1 bytes 19750 [Flat Map (4/5)#5] INFO org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=3, inputChannelIdx=3} prepareSnapshot 14 bytes 19750 [Flat Map (1/5)#5] INFO org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=0, inputChannelIdx=0} prepareSnapshot 17 bytes 19750 [Flat Map (1/5)#5] INFO org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=1, inputChannelIdx=0} prepareSnapshot 13 bytes 19750 [Flat Map (1/5)#5] INFO org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=2, inputChannelIdx=0} prepareSnapshot 18 bytes 19750 [Flat Map (1/5)#5] INFO org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} prepareSnapshot 14 bytes 19751 [Flat Map (1/5)#5] INFO org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes 19751 [Flat Map (1/5)#5] INFO org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes 19751 [Flat Map (1/5)#5] INFO org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes 19752 [Flat Map (1/5)#5] INFO org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes 19752 [Flat Map (1/5)#5] INFO org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes 19752 [Flat Map (1/5)#5] INFO org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes 19752 [Channel state writer Flat Map (4/5)#5] INFO org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl [] - Flat Map (4/5)#5 discarding 0 drained requests 19752 [Flat Map (1/5)#5] INFO org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes 19752 [Flat Map (4/5)#5] INFO org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl [] - Flat Map (4/5)#5 discarding 1 drained requests 19753 [Source: source1 (1/5)#5] INFO org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase [] - Snapshotted LongSplit{increment=5, nextNumber=21525, numCompletedCheckpoints=4} @ 0 subtask (5 attempt) 19753 [Source: source1 (4/5)#5] INFO org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase [] - Snapshotted LongSplit{increment=5, nextNumber=21528, numCompletedCheckpoints=4} @ 3 subtask (5 attempt) 19753 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (3/5)#5 (0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=0, inputChannelIdx=2} recovered 2 bytes 19753 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (3/5)#5 (0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=1, inputChannelIdx=2} recovered 12 bytes 19753 [AsyncOperations-thread-1] INFO org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Flat Map (4/5)#5 - asynchronous part of checkpoint 11 could not be completed. java.util.concurrent.CancellationException: null at java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_222] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_222] at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:583) ~[classes/:?] at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:59) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:115) [classes/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222] 19753 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (3/5)#5 (0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=2, inputChannelIdx=2} recovered 4 bytes 19753 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (3/5)#5 (0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=3, inputChannelIdx=2} recovered 14 bytes 19753 [channel-state-unspilling-thread-1] INFO org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (3/5)#5 (0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=2, inputChannelIdx=2} recovered 4096 bytes 19753 [Flat Map (4/5)#5] WARN org.apache.flink.runtime.taskmanager.Task [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79) switched from RUNNING to FAILED. java.lang.ArithmeticException: integer overflow at java.lang.Math.toIntExact(Math.java:1011) ~[?:1.8.0_222] at java.lang.StrictMath.toIntExact(StrictMath.java:813) ~[?:1.8.0_222] at org.apache.flink.test.checkpointing.UnalignedCheckpointITCase$CountingMapFunction.flatMap(UnalignedCheckpointITCase.java:414) ~[test-classes/:?] at org.apache.flink.test.checkpointing.UnalignedCheckpointITCase$CountingMapFunction.flatMap(UnalignedCheckpointITCase.java:401) ~[test-classes/:?] at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193) ~[classes/:?] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:185) ~[classes/:?] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158) ~[classes/:?] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[classes/:?] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [classes/:?] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [classes/:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222] {noformat} > Unaligned checkpoint recovery may lead to corrupted data stream > --------------------------------------------------------------- > > Key: FLINK-20654 > URL: https://issues.apache.org/jira/browse/FLINK-20654 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.12.0 > Reporter: Arvid Heise > Priority: Major > Fix For: 1.13.0, 1.12.1 > > > Fix of FLINK-20433 shows potential corruption after recovery for all > variations of UnalignedCheckpointITCase. > To reproduce, run UCITCase a couple hundreds times. The issue showed for me > in: > - execute [Parallel union, p = 5] > - execute [Parallel union, p = 10] > - execute [Parallel cogroup, p = 5] > - execute [parallel pipeline with remote channels, p = 5] > with decreasing frequency. > The issue manifests as one of the following issues: > - stream corrupted exception > - EOF exception > - assertion failure in NUM_LOST or NUM_OUT_OF_ORDER > - (for union) ArithmeticException overflow (because the number that should be > [0;100000] has been mis-deserialized) -- This message was sent by Atlassian Jira (v8.3.4#803005)