Piotr Nowojski created FLINK-37670:
--------------------------------------

             Summary: Watermark alignment can deadlock job if there are no more 
splits to be assigned
                 Key: FLINK-37670
                 URL: https://issues.apache.org/jira/browse/FLINK-37670
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream, Runtime / Task
    Affects Versions: 1.20.1, 1.19.2, 2.0.0
            Reporter: Piotr Nowojski


{{SourceCoordinator#announceCombinedWatermark}} stops announcing combined 
watermark if {{context.hasNoMoreSplits(subtaskId)}} returns true.

{noformat}
        for (Integer subtaskId : subTaskIds) {
            // when subtask have been finished, do not send event.
            if (!context.hasNoMoreSplits(subtaskId)) {
                // Subtask maybe during deploying or restarting, so we only send
                // WatermarkAlignmentEvent to ready task to avoid period task 
fail
                // (Java-ThreadPoolExecutor will not schedule the period task 
if it throws an
                // exception).
                context.sendEventToSourceOperatorIfTaskReady(
                        subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
            }
        }
{noformat}

Which means new max allowed watermark will not be announced anymore, preventing 
sources from making progress. The intention was to prevent failures caused by 
tasks not running:

{noformat}
Caused by: 
org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task 
is not running, but in state FINISHED
        at 
org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1613) 
~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
        at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask(TaskExecutor.java:1478)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
        at jdk.internal.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) 
~[?:?]
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
 ~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT]
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[flink-rpc-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
 ~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
 ~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT]
{noformat}

but:
# This is an incorrect check. This, contrary to the comment, doesn't check if 
all subtasks have finished, but only that source coordinator has no more splits 
to assign.
# Even if it was checking if all subtasks finished, that wouldn't work because:
  * one subtask can be finished, while others are still running
  * there can be race conditions between sending an operator event to a running 
operator and that operator switching to finished

I think, at least for some non critical events, we need to just ignore 
{{TaskNotRunningException}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to