[ https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899674#comment-17899674 ]
Arvid Heise commented on FLINK-36743: ------------------------------------- I try to dig a bit deeper because there may still be a general bug hidden. Just a bit background because you looked at the state and couldn't find any channel state for the faulty checkpoint: * Channel state is split into resultSubpartitionState on the upstream operator and input channel state in the downstream operator * You just looked at the downstream operator but the faulty state is on the upstream operator in the resultSubpartitionState. * Upstream connects to downstream on one physical channel and creates a few virtual channels which maps to old physical channels. * The error indicates that upstream tries to send data over one virtual channel that is unknown downstream. * I need to figure out if this is a specific issue to rescale or to the mapping. If it's an issue with rescale, you probably need to disable the extensions on your job. * The reason why it worked most of the time is probably because you don't have much backpressure to begin with and most channel state turns out empty (as seen in your screenshots). Backpressure first builds up on the input side and then translates to the result partition side. Backpressure goes down the other way around during checkpoint alignment: first result partitions are cleared, then input channels. Unaligned checkpoints first wait a bit and check if an aligned checkpoint would work. After the alignment timed out, it turns into unaligned checkpoint and persists the data. In most cases, if there is only little backpressure, the result partition is already cleared and the input channels are still partially filled. * In your case, you don't rescale the sink side, so the input side remains stable and just works. The issues arise in the subpartition reassignment. * For some unknown reason, it looks like in the faulty checkpoint, there is no state in the input channels but there is state in the subpartitions on the upstream operator. That is very odd as explained above. My only explanation is a network issue, where data couldn't be sent downstream for a limited time (because the checkpoint barrier eventually went through). The changes could work but I really need to play around in my IDE to fully understand the implications. For now, I recommend to disable the changes (by unsetting the options) for that particular pipeline. At least on the respective RESCALE, it doesn't seem to do anything at all (look how empty the state is). > Rescale from unaligend checkpoint failed > ---------------------------------------- > > Key: FLINK-36743 > URL: https://issues.apache.org/jira/browse/FLINK-36743 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Reporter: Feifan Wang > Priority: Major > Attachments: > Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch, > image-2024-11-19-14-58-22-975.png, image-2024-11-19-17-27-55-387.png, > image-2024-11-19-17-30-14-816.png > > > We encountered the following exception when scaling down a job from 5600 to > 4200: > {code:java} > 2024-11-12 19:20:54,308 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > xxxxxx (1358/1400) > (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) > switched from RUNNING to FAILED on > container_e33_1725519807238_6894116_01_000825 @ yg- > java.lang.IllegalStateException: Cannot select > SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; > known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=4200}] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code} > * Flink version : 1.16.1 > * unaligned checkpoint : enabled > * log-based checkpoint : enabled > The exception encountered when restore from chk-2718336, and it can > successfully restore from chk-2718333. And I checked the metadata file of > chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like > there is something wrong with the unaligned checkpoint when reassign > in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] -- This message was sent by Atlassian Jira (v8.20.10#820010)