[ 
https://issues.apache.org/jira/browse/FLINK-38267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18015821#comment-18015821
 ] 

Rui Fan commented on FLINK-38267:
---------------------------------

Merged to master(2.2.0) via:
 * 6f171e45f8637147b89815d235091644873a1e86
 * d81751aea0b1de18fec392edd960cc4483a2527e

> Job cannot be recovered from unaligned checkpoint after rescaling when one 
> task has multiple exchanges
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38267
>                 URL: https://issues.apache.org/jira/browse/FLINK-38267
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 2.0.0, 1.20.2, 2.1.0, 2.2.0
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2025-08-19-13-58-15-029.png
>
>
> h1. 1. Phenomenon:
> Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and 
> the exception is:
> {code:java}
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
> partitioner. 
> Did you change the partitioner to forward or rescale? 
> It may also help to add an explicit shuffle().{code}
>  
> UC is the abbreviation of unaligned checkpoint in this ticket.
> h1. 2. Reason
> h2. 2.1 What types of jobs trigger this bug?
> When one upstream task has multiple output exchanges, which including UC 
> SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED 
> exchanges(likes Forward or rescale).
> Or when one downstream task has multiple input exchanges, which including UC 
> SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED 
> exchanges(likes Forward or rescale).
> h2. 2.2 Why does this bug happen?
> When job is rescaled and recovered from unaligned checkpoint, flink need to 
> redistribute inflight buffers (input buffers on downstream side and output 
> buffers on upstream side).
> The ForwardPartitioner and RescalePartitioner exchanges do not support 
> unaligned checkpoint, so they are not expected to perform redistribution 
> logic. From code implementation:
>  * For input buffers redistribution[1], if current task has no input buffer 
> state, and upstream task has no output buffer state, the code will return 
> directly without any redistribution.
>  * For output buffers redistribution[2], if current task has no output buffer 
> state, and downstream task has no input buffer state, the code will return 
> directly without any redistribution.
> But it does not work when upstream tasks has multiple output exchanges. 
> Following is an DAG example, there are 3 tasks and 2 exchanges (Hash and 
> forward) .
>  * The Hash exchange supports unaligned checkpoint
>  * The Hash exchange does not support unaligned checkpoint
>  
> !image-2025-08-19-13-58-15-029.png|width=786,height=421!
> When Job is recovered from UC(unaligned checkpoint) after rescaling, the *Map 
> after forward* will check its input buffer state and Source’s output buffer 
> state. Source task has output buffer state for this case, but these output 
> buffer state is from Hash exchange instead of Forward exchange.
> It caused the redistribution will be called for {*}Map after forward{*}, it 
> is unexpected.
> Of course, from the perspective of upstream task(Source task), it has 2 
> output exchanges, the forward exchange should not call rescale logic even if 
> hash exchange has state.
> h2. 2.3 Reproduce
> The following job can reproduce this bug easily.
>  
> {code:java}
> import org.apache.commons.math3.random.RandomDataGenerator;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.typeinfo.Types;
> import 
> org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.connector.datagen.source.DataGeneratorSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> /**
>  * It could reproduce this issue:
>  * Caused by: java.lang.UnsupportedOperationException: Cannot rescale the 
> given pointwise partitioner.
>  * Did you change the partitioner to forward or rescale?
>  * It may also help to add an explicit shuffle().
>  */
> public class UnalignedCheckpointBugDemo {
>     private static final Logger LOG = 
> LoggerFactory.getLogger(UnalignedCheckpointBugDemo.class);
>     public static void main(String[] args) throws Exception {
>         Configuration conf = new Configuration();
>         conf.setString("rest.port", "12348");
>         conf.setString("execution.checkpointing.unaligned.enabled", "true");
>         conf.setString("execution.checkpointing.interval", "10s");
>         conf.setString("execution.checkpointing.min-pause", "8s");
>         conf.setString("jobmanager.scheduler", "adaptive");
>         conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>         env.disableOperatorChaining();
>         env.setParallelism(5);
>         SingleOutputStreamOperator<String> stream1 = env.fromSource(
>                         new DataGeneratorSource<>(
>                                 value -> new 
> RandomDataGenerator().nextHexString(300),
>                                 Long.MAX_VALUE,
>                                 RateLimiterStrategy.perSecond(100000),
>                                 Types.STRING),
>                         WatermarkStrategy.noWatermarks(),
>                         "Source Task");
>         stream1
>                 .keyBy(new KeySelectorFunction())
>                 .map(x -> {
>                     Thread.sleep(50);
>                     return x;
>                 }).name("Map after hash");
>         stream1.map(x -> {
>                     Thread.sleep(5);
>                     return x;
>             }).name("Map after forward");
>         env.execute(UnalignedCheckpointBugDemo.class.getSimpleName());
>     }
>     private static class KeySelectorFunction implements KeySelector<String, 
> Integer> {
>         @Override
>         public Integer getKey(String value) throws Exception {
>             return 0;
>         }
>     }
> }
> {code}
>  
> h1. 3. Solution
> The implemented solution was to make the state redistribution logic more 
> granular by checking for in-flight data on a *per-exchange* basis instead of 
> a per-task basis.
>  # *Precise State Tracking:* The {{TaskStateAssignment}} class was refactored 
> to no longer use a simple boolean flag. It now precisely tracks which 
> specific input gates and result partitions contain in-flight data.
>  # *Per-Channel/Partition Checks:* The core redistribution methods, 
> {{reDistributeInputChannelStates}} and 
> {{{}reDistributeResultSubpartitionStates{}}}, were modified. Their internal 
> logic now iterates through each input gate or output partition and uses new 
> helper methods ({{{}hasInFlightDataForInputGate{}}} and 
> {{{}hasInFlightDataForResultPartition{}}}) to check if that _specific 
> channel_ has state.
>  # *Conditional Logic:* The state redistribution logic is now wrapped in a 
> conditional block. It is only invoked for a channel if the per-exchange check 
> passes. This ensures that stateless exchanges (like {{forward}} or 
> {{{}rescale{}}}) are correctly skipped, avoiding the exception.
> This approach fixes the bug by applying the redistribution logic only where 
> it is actually needed, allowing jobs with mixed partitioner types to rescale 
> from an unaligned checkpoint successfully.
>  
> [1] 
> [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L413]
> [2] 
> [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L364]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to