Piotr Nowojski created FLINK-37670: -------------------------------------- Summary: Watermark alignment can deadlock job if there are no more splits to be assigned Key: FLINK-37670 URL: https://issues.apache.org/jira/browse/FLINK-37670 Project: Flink Issue Type: Bug Components: API / DataStream, Runtime / Task Affects Versions: 1.20.1, 1.19.2, 2.0.0 Reporter: Piotr Nowojski
{{SourceCoordinator#announceCombinedWatermark}} stops announcing combined watermark if {{context.hasNoMoreSplits(subtaskId)}} returns true. {noformat} for (Integer subtaskId : subTaskIds) { // when subtask have been finished, do not send event. if (!context.hasNoMoreSplits(subtaskId)) { // Subtask maybe during deploying or restarting, so we only send // WatermarkAlignmentEvent to ready task to avoid period task fail // (Java-ThreadPoolExecutor will not schedule the period task if it throws an // exception). context.sendEventToSourceOperatorIfTaskReady( subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); } } {noformat} Which means new max allowed watermark will not be announced anymore, preventing sources from making progress. The intention was to prevent failures caused by tasks not running: {noformat} Caused by: org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task is not running, but in state FINISHED at org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1613) ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] at org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask(TaskExecutor.java:1478) ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] at jdk.internal.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318) ~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT] at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316) ~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229) ~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT] {noformat} but: # This is an incorrect check. This, contrary to the comment, doesn't check if all subtasks have finished, but only that source coordinator has no more splits to assign. # Even if it was checking if all subtasks finished, that wouldn't work because: * one subtask can be finished, while others are still running * there can be race conditions between sending an operator event to a running operator and that operator switching to finished I think, at least for some non critical events, we need to just ignore {{TaskNotRunningException}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)