[ https://issues.apache.org/jira/browse/FLINK-37670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Nowojski reassigned FLINK-37670: -------------------------------------- Assignee: Piotr Nowojski > Watermark alignment can deadlock job if there are no more splits to be > assigned > ------------------------------------------------------------------------------- > > Key: FLINK-37670 > URL: https://issues.apache.org/jira/browse/FLINK-37670 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Task > Affects Versions: 2.0.0, 1.19.2, 1.20.1 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > Priority: Major > > {{SourceCoordinator#announceCombinedWatermark}} stops announcing combined > watermark if {{context.hasNoMoreSplits(subtaskId)}} returns true. > {noformat} > for (Integer subtaskId : subTaskIds) { > // when subtask have been finished, do not send event. > if (!context.hasNoMoreSplits(subtaskId)) { > // Subtask maybe during deploying or restarting, so we only > send > // WatermarkAlignmentEvent to ready task to avoid period task > fail > // (Java-ThreadPoolExecutor will not schedule the period task > if it throws an > // exception). > context.sendEventToSourceOperatorIfTaskReady( > subtaskId, new > WatermarkAlignmentEvent(maxAllowedWatermark)); > } > } > {noformat} > Which means new max allowed watermark will not be announced anymore, > preventing sources from making progress. The intention was to prevent > failures caused by tasks not running: > {noformat} > Caused by: > org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task > is not running, but in state FINISHED > at > org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1613) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask(TaskExecutor.java:1478) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at jdk.internal.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) > ~[?:?] > at > jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:?] > at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318) > ~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > ~[flink-rpc-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316) > ~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229) > ~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT] > {noformat} > but: > # This is an incorrect check. This, contrary to the comment, doesn't check if > all subtasks have finished, but only that source coordinator has no more > splits to assign. > # Even if it was checking if all subtasks finished, that wouldn't work > because: > * one subtask can be finished, while others are still running > * there can be race conditions between sending an operator event to a > running operator and that operator switching to finished > I think, at least for some non critical events, we need to just ignore > {{TaskNotRunningException}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)