[ 
https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899156#comment-17899156
 ] 

Arvid Heise commented on FLINK-36743:
-------------------------------------

Hey Feifan, it's a bit hard to debug remotely and it's also been a while since 
I worked on that code but I try my best to help.

You mentioned that you were able to find the in-flight data which is great. 
That means that you deeply understood the structure. Could you share some 
background on the partitioning used?

It's also great that you have a checkpoint where it works and a checkpoint that 
doesn't. Can you please make sure that you persist the data elsewhere until 
investigation has concluded? We may never have such a good setup again to find 
the root cause.

If possible, can you check if the rescaling of the good checkpoint also uses 
the same subtask mappings? Is this issue reproducible on each rescaling attempt?



 
{noformat}
Cannot select SubtaskConnectionDescriptor{inputSubtaskIndex=0, 
outputSubtaskIndex=4071}; known channels are 
[SubtaskConnectionDescriptor{inputSubtaskIndex=1357, outputSubtaskIndex=0}, 
SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
outputSubtaskIndex=4200}]{noformat}
One possibility is that we have some rounding errors around the remapping that 
becomes only visible in higher DOP. However, the expected index of 4071 is 
pretty far off the available indexes 0 and 4200. The input index is also 
completely off.

Another possibility is that we end up with shifted byte streams for some reason 
but then the subtask descriptor looks too normal.

So before going deeper. I'd like to understand which partitioner is used 
between upstream and downstream and the DPO of the respective tasks. Then I can 
manually compute if the event is incorrect or the state in downstream. If you 
have a way to debug if the event indeed comes from upstream subtask=0 that 
would be neat.

>From there we can see if the checkpoint is corrupted or the recovery code is 
>bugged.

 

> Rescale from unaligend checkpoint failed
> ----------------------------------------
>
>                 Key: FLINK-36743
>                 URL: https://issues.apache.org/jira/browse/FLINK-36743
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>            Reporter: Feifan Wang
>            Priority: Major
>
> We encountered the following exception when scaling down a job from 5600 to 
> 4200:
> {code:java}
> 2024-11-12 19:20:54,308 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> xxxxxx (1358/1400) 
> (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) 
> switched from RUNNING to FAILED on 
> container_e33_1725519807238_6894116_01_000825 @ yg-
> java.lang.IllegalStateException: Cannot select 
> SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; 
> known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=4200}]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code}
>  * Flink version : 1.16.1
>  * unaligned checkpoint : enabled
>  * log-based checkpoint : enabled
> The exception encountered when restore from chk-2718336, and it can 
> successfully restore from chk-2718333. And I checked the metadata file of 
> chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like 
> there is something wrong with the unaligned checkpoint when reassign 
> in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to