[ 
https://issues.apache.org/jira/browse/FLINK-20654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17251864#comment-17251864
 ] 

Arvid Heise commented on FLINK-20654:
-------------------------------------

For my investigation, I added a bunch of info statements to track which buffers 
are written.
https://github.com/AHeise/flink/tree/FLINK-20654

For [Parallel union, p = 5], I noticed that the issues arises only when 
multiple buffers of the same channel are recovered.


{noformat}
19749 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, 
inputChannelIdx=3} recovered 8 bytes
19749 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=1, 
inputChannelIdx=3} recovered 9 bytes
19750 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=2, 
inputChannelIdx=3} recovered 1 bytes
19750 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=3, 
inputChannelIdx=3} recovered 14 bytes
19750 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, 
inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, 
inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, 
inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, 
inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, 
inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, 
inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, 
inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, 
inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, 
inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (4/5)#5 
(de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, 
inputChannelIdx=3} recovered 4 bytes
19750 [Flat Map (4/5)#5] INFO  
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - 
InputChannelInfo{gateIdx=1, inputChannelIdx=3} prepareSnapshot 9 bytes
19750 [Flat Map (4/5)#5] INFO  
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - 
InputChannelInfo{gateIdx=2, inputChannelIdx=3} prepareSnapshot 1 bytes
19750 [Flat Map (4/5)#5] INFO  
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - 
InputChannelInfo{gateIdx=3, inputChannelIdx=3} prepareSnapshot 14 bytes
19750 [Flat Map (1/5)#5] INFO  
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - 
InputChannelInfo{gateIdx=0, inputChannelIdx=0} prepareSnapshot 17 bytes
19750 [Flat Map (1/5)#5] INFO  
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - 
InputChannelInfo{gateIdx=1, inputChannelIdx=0} prepareSnapshot 13 bytes
19750 [Flat Map (1/5)#5] INFO  
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - 
InputChannelInfo{gateIdx=2, inputChannelIdx=0} prepareSnapshot 18 bytes
19750 [Flat Map (1/5)#5] INFO  
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - 
InputChannelInfo{gateIdx=3, inputChannelIdx=0} prepareSnapshot 14 bytes
19751 [Flat Map (1/5)#5] INFO  
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] 
- InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19751 [Flat Map (1/5)#5] INFO  
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] 
- InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19751 [Flat Map (1/5)#5] INFO  
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] 
- InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19752 [Flat Map (1/5)#5] INFO  
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] 
- InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19752 [Flat Map (1/5)#5] INFO  
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] 
- InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19752 [Flat Map (1/5)#5] INFO  
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] 
- InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19752 [Channel state writer Flat Map (4/5)#5] INFO  
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
 [] - Flat Map (4/5)#5 discarding 0 drained requests
19752 [Flat Map (1/5)#5] INFO  
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] 
- InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19752 [Flat Map (4/5)#5] INFO  
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
 [] - Flat Map (4/5)#5 discarding 1 drained requests
19753 [Source: source1 (1/5)#5] INFO  
org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase [] - 
Snapshotted LongSplit{increment=5, nextNumber=21525, numCompletedCheckpoints=4} 
@ 0 subtask (5 attempt)
19753 [Source: source1 (4/5)#5] INFO  
org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase [] - 
Snapshotted LongSplit{increment=5, nextNumber=21528, numCompletedCheckpoints=4} 
@ 3 subtask (5 attempt)
19753 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (3/5)#5 
(0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=0, 
inputChannelIdx=2} recovered 2 bytes
19753 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (3/5)#5 
(0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=1, 
inputChannelIdx=2} recovered 12 bytes
19753 [AsyncOperations-thread-1] INFO  
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Flat Map 
(4/5)#5 - asynchronous part of checkpoint 11 could not be completed.
java.util.concurrent.CancellationException: null
        at java.util.concurrent.FutureTask.report(FutureTask.java:121) 
~[?:1.8.0_222]
        at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
~[?:1.8.0_222]
        at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:583)
 ~[classes/:?]
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:59)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:115)
 [classes/:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_222]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_222]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
19753 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (3/5)#5 
(0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=2, 
inputChannelIdx=2} recovered 4 bytes
19753 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (3/5)#5 
(0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=3, 
inputChannelIdx=2} recovered 14 bytes
19753 [channel-state-unspilling-thread-1] INFO  
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] 
- Flat Map (3/5)#5 
(0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=2, 
inputChannelIdx=2} recovered 4096 bytes
19753 [Flat Map (4/5)#5] WARN  org.apache.flink.runtime.taskmanager.Task [] - 
Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79) switched from RUNNING to 
FAILED.
java.lang.ArithmeticException: integer overflow
        at java.lang.Math.toIntExact(Math.java:1011) ~[?:1.8.0_222]
        at java.lang.StrictMath.toIntExact(StrictMath.java:813) ~[?:1.8.0_222]
        at 
org.apache.flink.test.checkpointing.UnalignedCheckpointITCase$CountingMapFunction.flatMap(UnalignedCheckpointITCase.java:414)
 ~[test-classes/:?]
        at 
org.apache.flink.test.checkpointing.UnalignedCheckpointITCase$CountingMapFunction.flatMap(UnalignedCheckpointITCase.java:401)
 ~[test-classes/:?]
        at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:185)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) 
~[classes/:?]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
[classes/:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
[classes/:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
{noformat}


> Unaligned checkpoint recovery may lead to corrupted data stream
> ---------------------------------------------------------------
>
>                 Key: FLINK-20654
>                 URL: https://issues.apache.org/jira/browse/FLINK-20654
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.12.0
>            Reporter: Arvid Heise
>            Priority: Major
>             Fix For: 1.13.0, 1.12.1
>
>
> Fix of FLINK-20433 shows potential corruption after recovery for all 
> variations of UnalignedCheckpointITCase.
> To reproduce, run UCITCase a couple hundreds times. The issue showed for me 
> in:
> - execute [Parallel union, p = 5]
> - execute [Parallel union, p = 10]
> - execute [Parallel cogroup, p = 5]
> - execute [parallel pipeline with remote channels, p = 5]
> with decreasing frequency.
> The issue manifests as one of the following issues:
> - stream corrupted exception
> - EOF exception
> - assertion failure in NUM_LOST or NUM_OUT_OF_ORDER
> - (for union) ArithmeticException overflow (because the number that should be 
> [0;100000] has been mis-deserialized)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to