[ https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899156#comment-17899156 ]
Arvid Heise commented on FLINK-36743: ------------------------------------- Hey Feifan, it's a bit hard to debug remotely and it's also been a while since I worked on that code but I try my best to help. You mentioned that you were able to find the in-flight data which is great. That means that you deeply understood the structure. Could you share some background on the partitioning used? It's also great that you have a checkpoint where it works and a checkpoint that doesn't. Can you please make sure that you persist the data elsewhere until investigation has concluded? We may never have such a good setup again to find the root cause. If possible, can you check if the rescaling of the good checkpoint also uses the same subtask mappings? Is this issue reproducible on each rescaling attempt? {noformat} Cannot select SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, outputSubtaskIndex=4200}]{noformat} One possibility is that we have some rounding errors around the remapping that becomes only visible in higher DOP. However, the expected index of 4071 is pretty far off the available indexes 0 and 4200. The input index is also completely off. Another possibility is that we end up with shifted byte streams for some reason but then the subtask descriptor looks too normal. So before going deeper. I'd like to understand which partitioner is used between upstream and downstream and the DPO of the respective tasks. Then I can manually compute if the event is incorrect or the state in downstream. If you have a way to debug if the event indeed comes from upstream subtask=0 that would be neat. >From there we can see if the checkpoint is corrupted or the recovery code is >bugged. > Rescale from unaligend checkpoint failed > ---------------------------------------- > > Key: FLINK-36743 > URL: https://issues.apache.org/jira/browse/FLINK-36743 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Reporter: Feifan Wang > Priority: Major > > We encountered the following exception when scaling down a job from 5600 to > 4200: > {code:java} > 2024-11-12 19:20:54,308 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > xxxxxx (1358/1400) > (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) > switched from RUNNING to FAILED on > container_e33_1725519807238_6894116_01_000825 @ yg- > java.lang.IllegalStateException: Cannot select > SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; > known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=4200}] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code} > * Flink version : 1.16.1 > * unaligned checkpoint : enabled > * log-based checkpoint : enabled > The exception encountered when restore from chk-2718336, and it can > successfully restore from chk-2718333. And I checked the metadata file of > chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like > there is something wrong with the unaligned checkpoint when reassign > in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] -- This message was sent by Atlassian Jira (v8.20.10#820010)