[ 
https://issues.apache.org/jira/browse/FLINK-37670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-37670:
--------------------------------------

    Assignee: Piotr Nowojski

> Watermark alignment can deadlock job if there are no more splits to be 
> assigned
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-37670
>                 URL: https://issues.apache.org/jira/browse/FLINK-37670
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Runtime / Task
>    Affects Versions: 2.0.0, 1.19.2, 1.20.1
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>
> {{SourceCoordinator#announceCombinedWatermark}} stops announcing combined 
> watermark if {{context.hasNoMoreSplits(subtaskId)}} returns true.
> {noformat}
>         for (Integer subtaskId : subTaskIds) {
>             // when subtask have been finished, do not send event.
>             if (!context.hasNoMoreSplits(subtaskId)) {
>                 // Subtask maybe during deploying or restarting, so we only 
> send
>                 // WatermarkAlignmentEvent to ready task to avoid period task 
> fail
>                 // (Java-ThreadPoolExecutor will not schedule the period task 
> if it throws an
>                 // exception).
>                 context.sendEventToSourceOperatorIfTaskReady(
>                         subtaskId, new 
> WatermarkAlignmentEvent(maxAllowedWatermark));
>             }
>         }
> {noformat}
> Which means new max allowed watermark will not be announced anymore, 
> preventing sources from making progress. The intention was to prevent 
> failures caused by tasks not running:
> {noformat}
> Caused by: 
> org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task 
> is not running, but in state FINISHED
>       at 
> org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1613)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
>       at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask(TaskExecutor.java:1478)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
>       at jdk.internal.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) 
> ~[?:?]
>       at 
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:?]
>       at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
>       at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
>  ~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT]
>       at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>  ~[flink-rpc-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
>       at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
>  ~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT]
>       at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
>  ~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT]
> {noformat}
> but:
> # This is an incorrect check. This, contrary to the comment, doesn't check if 
> all subtasks have finished, but only that source coordinator has no more 
> splits to assign.
> # Even if it was checking if all subtasks finished, that wouldn't work 
> because:
>   * one subtask can be finished, while others are still running
>   * there can be race conditions between sending an operator event to a 
> running operator and that operator switching to finished
> I think, at least for some non critical events, we need to just ignore 
> {{TaskNotRunningException}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to