[ https://issues.apache.org/jira/browse/FLINK-38357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18020366#comment-18020366 ]
Xinglong Wang commented on FLINK-38357: --------------------------------------- [~fanrui] Could you please take a look at this issue? If I have misunderstood anything or if there are any mistakes in my understanding, please feel free to correct me. Thank you! > ResultSubpartitionRecoveredStateHandler.getSubpartition > ArrayIndexOutOfBoundException 0 > --------------------------------------------------------------------------------------- > > Key: FLINK-38357 > URL: https://issues.apache.org/jira/browse/FLINK-38357 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.16.1, 2.1.0 > Reporter: Xinglong Wang > Priority: Major > Attachments: image-2025-09-15-21-34-19-517.png > > > h2. Problem > StreamTask: {*}_map -> (filter -> Sink: sink-r, map-q)_{*}, the > tailOperator/outputOperator is {_}*Sink: sink-r*{_}. > {code:java} > // TaskStateAssignment > outputOperatorID = operatorIDs.get(0).getGeneratedOperatorID(); > > > // OperatorChain > this.tailOperatorWrapper = allOpWrappers.get(0); {code} > While in reality, *_map-q_* is connected to the downstream Task. > If we cleverly change the parallelism, it can result in this situation: > * StreamTask will change to {*}_map -> (filter, map-q)_{*}, > * and _*Sink: sink-r*_ will become a downstream StreamTask. > In this situation, *ResultSubpartitionRecoveredStateHandler#getSubpartition* > reports ArrayIndexOutOfBoundException: 0, since Sink does not have writers. > {code:java} > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getSubpartition(RecoveredChannelStateHandler.java:217) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.lambda$calculateMapping$1(RecoveredChannelStateHandler.java:237) > ~[flink-dist-1.16.1.jar:1.16.1] > at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) > ~[?:1.8.0_312] > at > java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) > ~[?:1.8.0_312] > at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) > ~[?:1.8.0_312] > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > ~[?:1.8.0_312] > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > ~[?:1.8.0_312] > at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > ~[?:1.8.0_312] > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > ~[?:1.8.0_312] > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > ~[?:1.8.0_312] > at > org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.calculateMapping(RecoveredChannelStateHandler.java:238) > ~[flink-dist-1.16.1.jar:1.16.1] > at java.util.HashMap.computeIfAbsent(HashMap.java:1128) ~[?:1.8.0_312] > at > org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getMappedChannels(RecoveredChannelStateHandler.java:227) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getBuffer(RecoveredChannelStateHandler.java:182) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler.getBuffer(RecoveredChannelStateHandler.java:157) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateChunkReader.readChunk(SequentialChannelStateReaderImpl.java:200) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl.readSequentially(SequentialChannelStateReaderImpl.java:109) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl.read(SequentialChannelStateReaderImpl.java:95) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl.readOutputData(SequentialChannelStateReaderImpl.java:81) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:735) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:713) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:679) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937) > ~[flink-dist-1.16.1.jar:1.16.1] > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:906) > [flink-dist-1.16.1.jar:1.16.1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) > [flink-dist-1.16.1.jar:1.16.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > [flink-dist-1.16.1.jar:1.16.1] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312] {code} > > h2. Analysis > When a Task contains multiple output operators, the current implementation > only treats one of them as the tailOperator/outputOperator. If the > tailOperator/outputOperator is set to Sink, this leads to an error when > restoring the channel state for the ResultPartition: indexing into the > ResultPartitionWriter fails with an "index 0 out of bounds" exception. > > h2. Extension > If there's multiple output operators, each connected to a different > downstream tasks, the current Flink implementation's assumption of a single > tailOperator/outputOperator breaks down. For example, in a job with three > outputs: > !image-2025-09-15-21-34-19-517.png|width=158,height=157! > The mapping between ResultPartition and output operator needs to be more > detailed to correctly retain the relationship of channel state restoration. -- This message was sent by Atlassian Jira (v8.20.10#820010)