[ 
https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899490#comment-17899490
 ] 

Feifan Wang commented on FLINK-36743:
-------------------------------------

Sorry I missed a important change from my colleague one year ago [~arvid] , 
which allow user to set whether restore forward/rescale/broadcast from 
unaligned checkpoint with parallelism change.:( 

Here is the patch of this change: 
[^Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch]

^Key change in StreamingJobGraphGenerator :^

 
{code:java}
-        
jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());
-        
jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());
+        if ((partitioner instanceof ForwardPartitioner || partitioner 
instanceof RescalePartitioner)
+                && streamGraph
+                        .getCheckpointConfig()
+                        .isUnalignedCheckpointsWithoutOrderingGuarantees()) {
+            
jobEdge.setDownstreamSubtaskStateMapper(SubtaskStateMapper.ROUND_ROBIN);
+            
jobEdge.setUpstreamSubtaskStateMapper(SubtaskStateMapper.ARBITRARY);
+        } else if (partitioner instanceof BroadcastPartitioner
+                && streamGraph
+                        .getCheckpointConfig()
+                        .isUnalignedCheckpointsAllowBroadcastInconsistency()) {
+            
jobEdge.setDownstreamSubtaskStateMapper(SubtaskStateMapper.ROUND_ROBIN);
+            
jobEdge.setUpstreamSubtaskStateMapper(SubtaskStateMapper.ARBITRARY);
+        } else {
+            
jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());
+            
jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());
+        } {code}
 

This job use rescale partitioner just for reduce network connection and buffer, 
so it enable the two options.

This job has been running for more than a year with these two options turned 
on, and the parallelism has been changed many times. The above exception is the 
first time it has been encountered.

Do you think the situation is understandable after adding this information ? 
And do you think the changes in the above patch are meaningful?

> Rescale from unaligend checkpoint failed
> ----------------------------------------
>
>                 Key: FLINK-36743
>                 URL: https://issues.apache.org/jira/browse/FLINK-36743
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>            Reporter: Feifan Wang
>            Priority: Major
>         Attachments: 
> Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch,
>  image-2024-11-19-14-58-22-975.png, image-2024-11-19-17-27-55-387.png, 
> image-2024-11-19-17-30-14-816.png
>
>
> We encountered the following exception when scaling down a job from 5600 to 
> 4200:
> {code:java}
> 2024-11-12 19:20:54,308 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> xxxxxx (1358/1400) 
> (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) 
> switched from RUNNING to FAILED on 
> container_e33_1725519807238_6894116_01_000825 @ yg-
> java.lang.IllegalStateException: Cannot select 
> SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; 
> known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=4200}]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code}
>  * Flink version : 1.16.1
>  * unaligned checkpoint : enabled
>  * log-based checkpoint : enabled
> The exception encountered when restore from chk-2718336, and it can 
> successfully restore from chk-2718333. And I checked the metadata file of 
> chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like 
> there is something wrong with the unaligned checkpoint when reassign 
> in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to