[ https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899418#comment-17899418 ]
Feifan Wang commented on FLINK-36743: ------------------------------------- [~arvid] We do use a internal version forked from 1.16.1, but unaligned checkpoint related parts not be changed. ( And I‘m not work for Alibaba now. ) A information is that the sink operator not change parallelism, its upstream scale down from 5600 to 4200. I load the checkpoint metadata of chk-2718336, and I found the sink operator has no input channel state. On the contrary, the sink operator in chk-2718336 has an input channel state. (When I said earlier that both checkpoints have in-flight data, I did not limit it to the sink operator.) !image-2024-11-19-17-27-55-387.png|width=849,height=483! !image-2024-11-19-17-30-14-816.png|width=872,height=615! Thanks for suggestion [~pnowojski] , we will consider upgrade later, but 1.16.1 will still serve for a while. > Rescale from unaligend checkpoint failed > ---------------------------------------- > > Key: FLINK-36743 > URL: https://issues.apache.org/jira/browse/FLINK-36743 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Reporter: Feifan Wang > Priority: Major > Attachments: image-2024-11-19-14-58-22-975.png, > image-2024-11-19-17-27-55-387.png, image-2024-11-19-17-30-14-816.png > > > We encountered the following exception when scaling down a job from 5600 to > 4200: > {code:java} > 2024-11-12 19:20:54,308 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > xxxxxx (1358/1400) > (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) > switched from RUNNING to FAILED on > container_e33_1725519807238_6894116_01_000825 @ yg- > java.lang.IllegalStateException: Cannot select > SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; > known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=4200}] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code} > * Flink version : 1.16.1 > * unaligned checkpoint : enabled > * log-based checkpoint : enabled > The exception encountered when restore from chk-2718336, and it can > successfully restore from chk-2718333. And I checked the metadata file of > chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like > there is something wrong with the unaligned checkpoint when reassign > in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] -- This message was sent by Atlassian Jira (v8.20.10#820010)