[ 
https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899674#comment-17899674
 ] 

Arvid Heise commented on FLINK-36743:
-------------------------------------

I try to dig a bit deeper because there may still be a general bug hidden. Just 
a bit background because you looked at the state and couldn't find any channel 
state for the faulty checkpoint:
 * Channel state is split into resultSubpartitionState on the upstream operator 
and input channel state in the downstream operator
 * You just looked at the downstream operator but the faulty state is on the 
upstream operator in the resultSubpartitionState.
 * Upstream connects to downstream on one physical channel and creates a few 
virtual channels which maps to old physical channels.
 * The error indicates that upstream tries to send data over one virtual 
channel that is unknown downstream.
 * I need to figure out if this is a specific issue to rescale or to the 
mapping. If it's an issue with rescale, you probably need to disable the 
extensions on your job.
 * The reason why it worked most of the time is probably because you don't have 
much backpressure to begin with and most channel state turns out empty (as seen 
in your screenshots). Backpressure first builds up on the input side and then 
translates to the result partition side. Backpressure goes down the other way 
around during checkpoint alignment: first result partitions are cleared, then 
input channels. Unaligned checkpoints first wait a bit and check if an aligned 
checkpoint would work. After the alignment timed out, it turns into unaligned 
checkpoint and persists the data. In most cases, if there is only little 
backpressure, the result partition is already cleared and the input channels 
are still partially filled.
 * In your case, you don't rescale the sink side, so the input side remains 
stable and just works. The issues arise in the subpartition reassignment.
 * For some unknown reason, it looks like in the faulty checkpoint, there is no 
state in the input channels but there is state in the subpartitions on the 
upstream operator. That is very odd as explained above. My only explanation is 
a network issue, where data couldn't be sent downstream for a limited time 
(because the checkpoint barrier eventually went through).

The changes could work but I really need to play around in my IDE to fully 
understand the implications.

For now, I recommend to disable the changes (by unsetting the options) for that 
particular pipeline. At least on the respective RESCALE, it doesn't seem to do 
anything at all (look how empty the state is).

> Rescale from unaligend checkpoint failed
> ----------------------------------------
>
>                 Key: FLINK-36743
>                 URL: https://issues.apache.org/jira/browse/FLINK-36743
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>            Reporter: Feifan Wang
>            Priority: Major
>         Attachments: 
> Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch,
>  image-2024-11-19-14-58-22-975.png, image-2024-11-19-17-27-55-387.png, 
> image-2024-11-19-17-30-14-816.png
>
>
> We encountered the following exception when scaling down a job from 5600 to 
> 4200:
> {code:java}
> 2024-11-12 19:20:54,308 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> xxxxxx (1358/1400) 
> (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) 
> switched from RUNNING to FAILED on 
> container_e33_1725519807238_6894116_01_000825 @ yg-
> java.lang.IllegalStateException: Cannot select 
> SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; 
> known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=4200}]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code}
>  * Flink version : 1.16.1
>  * unaligned checkpoint : enabled
>  * log-based checkpoint : enabled
> The exception encountered when restore from chk-2718336, and it can 
> successfully restore from chk-2718333. And I checked the metadata file of 
> chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like 
> there is something wrong with the unaligned checkpoint when reassign 
> in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to