[ 
https://issues.apache.org/jira/browse/FLINK-22686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17349166#comment-17349166
 ] 

Dawid Wysakowicz commented on FLINK-22686:
------------------------------------------

The reason for the particular exception is that the two input gates have 
different partitioning policies which breaks the assumption made for the 
implementation of UC rescaling. During implementing a fix for it I realized 
that there is another problem underneath that UC rescaling does not work well 
with BROADCAST partitioning whatsoever. For UC we cannot guarantee that the 
state of each operator is consistent in regards to processed records. Some of 
the operators might have already consumed a copy of a particular event, while 
others might have not. There is no way to reliably rescale the state of the 
channels so that each operator sees the same set of events after rescaling.

We decided not to fix the original issue in 1.13.1 as it might actually enable 
the discovered one, which in turn can lead to silent inconsistencies in the 
restored state. We will include a fix for both issues in 1.14.

> Incompatible subtask mappings while resuming from unaligned checkpoints
> -----------------------------------------------------------------------
>
>                 Key: FLINK-22686
>                 URL: https://issues.apache.org/jira/browse/FLINK-22686
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.13.0
>            Reporter: Arvid Heise
>            Assignee: Dawid Wysakowicz
>            Priority: Blocker
>             Fix For: 1.14.0
>
>         Attachments: topology_1.png, topology_2.png, topology_3.png
>
>
> A user 
> [reported|https://lists.apache.org/x/list.html?u...@flink.apache.org:lte=1M:Flink%201.13.0%20reactive%20mode:%20Job%20stop%20and%20cannot%20restore%20from%20checkpoint]
>  that he encountered an internal error while resuming during reactive mode. 
> There isn't an immediate connection to reactive mode, so it's safe to assume 
> that one rescaling case was not covered.
> {noformat}
> Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: 
> are multiple operators ingesting/producing intermediate results with varying 
> degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 
> 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 
> 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 
> 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 
> 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 
> 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 
> 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 
> 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 
> 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 
> 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 
> 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 
> 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 
> 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 
> 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 
> 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 
> 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 
> 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 
> 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 
> 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 
> 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 
> 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 
> 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 
> 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 
> 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 
> 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 
> 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 
> 209]]}.
>         at 
> org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>         at 
> org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>         at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>         at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>         at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>         at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>         at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>         at 
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>         at 
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>         at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:986)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>         at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$25(AdaptiveScheduler.java:976)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>         at 
> org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>         at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
>  ~[?:?]
>         at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>  ~[?:?]
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>         at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>         at java.lang.Thread.run(Thread.java:834) ~[?:?]
> {noformat}
> Here it seems that the same gate gets input from a range-partitioned and a 
> round-robin partitioned channel at the same time. During the implementation 
> of FLINK-19801, we couldn't find such a case and optimized the implementation 
> accordingly.
> We have asked the user to provide his topology.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to