[ https://issues.apache.org/jira/browse/FLINK-22686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17349166#comment-17349166 ]
Dawid Wysakowicz commented on FLINK-22686: ------------------------------------------ The reason for the particular exception is that the two input gates have different partitioning policies which breaks the assumption made for the implementation of UC rescaling. During implementing a fix for it I realized that there is another problem underneath that UC rescaling does not work well with BROADCAST partitioning whatsoever. For UC we cannot guarantee that the state of each operator is consistent in regards to processed records. Some of the operators might have already consumed a copy of a particular event, while others might have not. There is no way to reliably rescale the state of the channels so that each operator sees the same set of events after rescaling. We decided not to fix the original issue in 1.13.1 as it might actually enable the discovered one, which in turn can lead to silent inconsistencies in the restored state. We will include a fix for both issues in 1.14. > Incompatible subtask mappings while resuming from unaligned checkpoints > ----------------------------------------------------------------------- > > Key: FLINK-22686 > URL: https://issues.apache.org/jira/browse/FLINK-22686 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.13.0 > Reporter: Arvid Heise > Assignee: Dawid Wysakowicz > Priority: Blocker > Fix For: 1.14.0 > > Attachments: topology_1.png, topology_2.png, topology_3.png > > > A user > [reported|https://lists.apache.org/x/list.html?u...@flink.apache.org:lte=1M:Flink%201.13.0%20reactive%20mode:%20Job%20stop%20and%20cannot%20restore%20from%20checkpoint] > that he encountered an internal error while resuming during reactive mode. > There isn't an immediate connection to reactive mode, so it's safe to assume > that one rescaling case was not covered. > {noformat} > Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: > are multiple operators ingesting/producing intermediate results with varying > degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, > 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, > 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, > 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, > 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, > 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, > 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, > 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, > 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, > 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, > 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, > 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, > 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, > 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, > 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, > 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, > 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, > 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, > 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, > 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, > 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, > 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, > 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, > 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, > 209]]}. > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:986) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$25(AdaptiveScheduler.java:976) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > ~[?:?] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > ~[?:?] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?] > at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:834) ~[?:?] > {noformat} > Here it seems that the same gate gets input from a range-partitioned and a > round-robin partitioned channel at the same time. During the implementation > of FLINK-19801, we couldn't find such a case and optimized the implementation > accordingly. > We have asked the user to provide his topology. -- This message was sent by Atlassian Jira (v8.3.4#803005)