[ 
https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899378#comment-17899378
 ] 

Arvid Heise commented on FLINK-36743:
-------------------------------------

I might miss something obvious but I think RESCALE should never use unaligned 
checkpoint. We do have multiple safeguards to disallow rescale for unaligned 
checkpoint (that means no channel state will be persisted for that edge but for 
the others):
 * The RescalePartitioner has isPointwise() = true, which makes it inelible for 
UC.
 * For restoring it uses the SubtaskStateMapper.UNSUPPORTED, which throws an 
exception on restore.

{noformat}
@Override
public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
    return SubtaskStateMapper.UNSUPPORTED;
}

@Override
public SubtaskStateMapper getUpstreamSubtaskStateMapper() {
    return SubtaskStateMapper.UNSUPPORTED;
}{noformat}
So I'm really confused by what I'm seeing here. I cannot understand why there 
is a channel state and why it tries to use it.

[~pnowojski] , [~roman] do you now if we had a bug in 1.16 that attempted to 
use UC in RESCALE? Afaik, we never shipped UC without that restriction 
implemented by https://issues.apache.org/jira/browse/FLINK-21936. 

> Rescale from unaligend checkpoint failed
> ----------------------------------------
>
>                 Key: FLINK-36743
>                 URL: https://issues.apache.org/jira/browse/FLINK-36743
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>            Reporter: Feifan Wang
>            Priority: Major
>         Attachments: image-2024-11-19-14-58-22-975.png
>
>
> We encountered the following exception when scaling down a job from 5600 to 
> 4200:
> {code:java}
> 2024-11-12 19:20:54,308 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> xxxxxx (1358/1400) 
> (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) 
> switched from RUNNING to FAILED on 
> container_e33_1725519807238_6894116_01_000825 @ yg-
> java.lang.IllegalStateException: Cannot select 
> SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; 
> known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=4200}]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code}
>  * Flink version : 1.16.1
>  * unaligned checkpoint : enabled
>  * log-based checkpoint : enabled
> The exception encountered when restore from chk-2718336, and it can 
> successfully restore from chk-2718333. And I checked the metadata file of 
> chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like 
> there is something wrong with the unaligned checkpoint when reassign 
> in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to