[ 
https://issues.apache.org/jira/browse/FLINK-38267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-38267:
----------------------------
    Affects Version/s: 2.1.0
                       2.2.0
                       1.20.2
                       2.0.0

> Job cannot be recovered from unaligned checkpoint after rescaling when one 
> task has multiple exchanges
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38267
>                 URL: https://issues.apache.org/jira/browse/FLINK-38267
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 2.0.0, 1.20.2, 2.1.0, 2.2.0
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>         Attachments: image-2025-08-19-13-58-15-029.png
>
>
> h1. 1. Phenomenon:
> Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and 
> the exception is:
> {code:java}
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
> partitioner. 
> Did you change the partitioner to forward or rescale? 
> It may also help to add an explicit shuffle().{code}
>  
> UC is the abbreviation of unaligned checkpoint in this ticket.
> h1. 2. Reason
> h2. 2.1 What types of jobs trigger this bug?
> When one upstream task has multiple output exchanges, which including UC 
> SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED 
> exchanges(likes Forward or rescale).
> Or when one downstream task has multiple input exchanges, which including UC 
> SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED 
> exchanges(likes Forward or rescale).
> h2. 2.2 Why does this bug happen?
> When job is rescaled and recovered from unaligned checkpoint, flink need to 
> redistribute inflight buffers (input buffers on downstream side and output 
> buffers on upstream side).
> The ForwardPartitioner and RescalePartitioner exchanges do not support 
> unaligned checkpoint, so they are not expected to perform redistribution 
> logic. From code implementation:
>  * For input buffers redistribution[1], if current task has no input buffer 
> state, and upstream task has no output buffer state, the code will return 
> directly without any redistribution.
>  * For output buffers redistribution[2], if current task has no output buffer 
> state, and downstream task has no input buffer state, the code will return 
> directly without any redistribution.
> But it does not work when upstream tasks has multiple output exchanges. 
> Following is an DAG example, there are 3 tasks and 2 exchanges (Hash and 
> forward) .
>  * The Hash exchange supports unaligned checkpoint
>  * The Hash exchange does not support unaligned checkpoint
>  
> !image-2025-08-19-13-58-15-029.png|width=786,height=421!
> When Job is recovered from UC(unaligned checkpoint) after rescaling, the *Map 
> after forward* will check its input buffer state and Source’s output buffer 
> state. Source task has output buffer state for this case, but these output 
> buffer state is from Hash exchange instead of Forward exchange.
> It caused the redistribution will be called for {*}Map after forward{*}, it 
> is unexpected.
>  
> h2. 2.3 Reproduce
> The following job can reproduce this bug easily.
>  
> {code:java}
> import org.apache.commons.math3.random.RandomDataGenerator;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.typeinfo.Types;
> import 
> org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.connector.datagen.source.DataGeneratorSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> /**
>  * It could reproduce this issue:
>  * Caused by: java.lang.UnsupportedOperationException: Cannot rescale the 
> given pointwise partitioner.
>  * Did you change the partitioner to forward or rescale?
>  * It may also help to add an explicit shuffle().
>  */
> public class UnalignedCheckpointBugDemo {
>     private static final Logger LOG = 
> LoggerFactory.getLogger(UnalignedCheckpointBugDemo.class);
>     public static void main(String[] args) throws Exception {
>         Configuration conf = new Configuration();
>         conf.setString("rest.port", "12348");
>         conf.setString("execution.checkpointing.unaligned.enabled", "true");
>         conf.setString("execution.checkpointing.interval", "10s");
>         conf.setString("execution.checkpointing.min-pause", "8s");
>         conf.setString("jobmanager.scheduler", "adaptive");
>         conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>         env.disableOperatorChaining();
>         env.setParallelism(5);
>         SingleOutputStreamOperator<String> stream1 = env.fromSource(
>                         new DataGeneratorSource<>(
>                                 value -> new 
> RandomDataGenerator().nextHexString(300),
>                                 Long.MAX_VALUE,
>                                 RateLimiterStrategy.perSecond(100000),
>                                 Types.STRING),
>                         WatermarkStrategy.noWatermarks(),
>                         "Source Task");
>         stream1
>                 .keyBy(new KeySelectorFunction())
>                 .map(x -> {
>                     Thread.sleep(50);
>                     return x;
>                 }).name("Map after hash");
>         stream1.map(x -> {
>                     Thread.sleep(5);
>                     return x;
>             }).name("Map after forward");
>         env.execute(UnalignedCheckpointBugDemo.class.getSimpleName());
>     }
>     private static class KeySelectorFunction implements KeySelector<String, 
> Integer> {
>         @Override
>         public Integer getKey(String value) throws Exception {
>             return 0;
>         }
>     }
> }
> {code}
>  
> h1. 3. Solution
> When current task check whether the upstream task has output buffer state, it 
> only check the corresponding exchange instead of all exchanges.
> When current task check whether the downstream task has input buffer state, 
> it only check the corresponding exchange instead of all exchanges.
>  
> [1] 
> [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L413]
> [2] 
> [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L364]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to