[ https://issues.apache.org/jira/browse/FLINK-38267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18015821#comment-18015821 ]
Rui Fan commented on FLINK-38267: --------------------------------- Merged to master(2.2.0) via: * 6f171e45f8637147b89815d235091644873a1e86 * d81751aea0b1de18fec392edd960cc4483a2527e > Job cannot be recovered from unaligned checkpoint after rescaling when one > task has multiple exchanges > ------------------------------------------------------------------------------------------------------ > > Key: FLINK-38267 > URL: https://issues.apache.org/jira/browse/FLINK-38267 > Project: Flink > Issue Type: Bug > Affects Versions: 2.0.0, 1.20.2, 2.1.0, 2.2.0 > Reporter: Rui Fan > Assignee: Rui Fan > Priority: Major > Labels: pull-request-available > Attachments: image-2025-08-19-13-58-15-029.png > > > h1. 1. Phenomenon: > Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and > the exception is: > {code:java} > java.lang.UnsupportedOperationException: Cannot rescale the given pointwise > partitioner. > Did you change the partitioner to forward or rescale? > It may also help to add an explicit shuffle().{code} > > UC is the abbreviation of unaligned checkpoint in this ticket. > h1. 2. Reason > h2. 2.1 What types of jobs trigger this bug? > When one upstream task has multiple output exchanges, which including UC > SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED > exchanges(likes Forward or rescale). > Or when one downstream task has multiple input exchanges, which including UC > SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED > exchanges(likes Forward or rescale). > h2. 2.2 Why does this bug happen? > When job is rescaled and recovered from unaligned checkpoint, flink need to > redistribute inflight buffers (input buffers on downstream side and output > buffers on upstream side). > The ForwardPartitioner and RescalePartitioner exchanges do not support > unaligned checkpoint, so they are not expected to perform redistribution > logic. From code implementation: > * For input buffers redistribution[1], if current task has no input buffer > state, and upstream task has no output buffer state, the code will return > directly without any redistribution. > * For output buffers redistribution[2], if current task has no output buffer > state, and downstream task has no input buffer state, the code will return > directly without any redistribution. > But it does not work when upstream tasks has multiple output exchanges. > Following is an DAG example, there are 3 tasks and 2 exchanges (Hash and > forward) . > * The Hash exchange supports unaligned checkpoint > * The Hash exchange does not support unaligned checkpoint > > !image-2025-08-19-13-58-15-029.png|width=786,height=421! > When Job is recovered from UC(unaligned checkpoint) after rescaling, the *Map > after forward* will check its input buffer state and Source’s output buffer > state. Source task has output buffer state for this case, but these output > buffer state is from Hash exchange instead of Forward exchange. > It caused the redistribution will be called for {*}Map after forward{*}, it > is unexpected. > Of course, from the perspective of upstream task(Source task), it has 2 > output exchanges, the forward exchange should not call rescale logic even if > hash exchange has state. > h2. 2.3 Reproduce > The following job can reproduce this bug easily. > > {code:java} > import org.apache.commons.math3.random.RandomDataGenerator; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.typeinfo.Types; > import > org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; > import org.apache.flink.api.java.functions.KeySelector; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.connector.datagen.source.DataGeneratorSource; > import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > /** > * It could reproduce this issue: > * Caused by: java.lang.UnsupportedOperationException: Cannot rescale the > given pointwise partitioner. > * Did you change the partitioner to forward or rescale? > * It may also help to add an explicit shuffle(). > */ > public class UnalignedCheckpointBugDemo { > private static final Logger LOG = > LoggerFactory.getLogger(UnalignedCheckpointBugDemo.class); > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > conf.setString("rest.port", "12348"); > conf.setString("execution.checkpointing.unaligned.enabled", "true"); > conf.setString("execution.checkpointing.interval", "10s"); > conf.setString("execution.checkpointing.min-pause", "8s"); > conf.setString("jobmanager.scheduler", "adaptive"); > conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob"); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > env.disableOperatorChaining(); > env.setParallelism(5); > SingleOutputStreamOperator<String> stream1 = env.fromSource( > new DataGeneratorSource<>( > value -> new > RandomDataGenerator().nextHexString(300), > Long.MAX_VALUE, > RateLimiterStrategy.perSecond(100000), > Types.STRING), > WatermarkStrategy.noWatermarks(), > "Source Task"); > stream1 > .keyBy(new KeySelectorFunction()) > .map(x -> { > Thread.sleep(50); > return x; > }).name("Map after hash"); > stream1.map(x -> { > Thread.sleep(5); > return x; > }).name("Map after forward"); > env.execute(UnalignedCheckpointBugDemo.class.getSimpleName()); > } > private static class KeySelectorFunction implements KeySelector<String, > Integer> { > @Override > public Integer getKey(String value) throws Exception { > return 0; > } > } > } > {code} > > h1. 3. Solution > The implemented solution was to make the state redistribution logic more > granular by checking for in-flight data on a *per-exchange* basis instead of > a per-task basis. > # *Precise State Tracking:* The {{TaskStateAssignment}} class was refactored > to no longer use a simple boolean flag. It now precisely tracks which > specific input gates and result partitions contain in-flight data. > # *Per-Channel/Partition Checks:* The core redistribution methods, > {{reDistributeInputChannelStates}} and > {{{}reDistributeResultSubpartitionStates{}}}, were modified. Their internal > logic now iterates through each input gate or output partition and uses new > helper methods ({{{}hasInFlightDataForInputGate{}}} and > {{{}hasInFlightDataForResultPartition{}}}) to check if that _specific > channel_ has state. > # *Conditional Logic:* The state redistribution logic is now wrapped in a > conditional block. It is only invoked for a channel if the per-exchange check > passes. This ensures that stateless exchanges (like {{forward}} or > {{{}rescale{}}}) are correctly skipped, avoiding the exception. > This approach fixes the bug by applying the redistribution logic only where > it is actually needed, allowing jobs with mixed partitioner types to rescale > from an unaligned checkpoint successfully. > > [1] > [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L413] > [2] > [https://github.com/apache/flink/blob/250ab882a339e4b1c512f788c8aaa722d6b99b77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L364] -- This message was sent by Atlassian Jira (v8.20.10#820010)