Thanks TIll and Yang for help! Also Thanks Till for a quick fix!

I did another test yesterday. In this test, I intentionally throw exception
from the source operator:
if (runtimeContext.getIndexOfThisSubtask() == 1
        && errorFrenquecyInMin > 0
        && System.currentTimeMillis() - lastStartTime >=
errorFrenquecyInMin * 60 * 1000) {
      lastStartTime = System.currentTimeMillis();
      throw new RuntimeException(
          "Trigger expected exception at: " + lastStartTime);
In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
1s (because no need for container allocation). Why phase 1 still takes 30s
even though no TM is lost?

Related logs:
2021-06-30 00:55:07,463 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
java.lang.RuntimeException: Trigger expected exception at: 1625014507446
2021-06-30 00:55:07,509 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
(35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
2021-06-30 00:55:37,596 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
(35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
2021-06-30 00:55:38,678 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph        (time when
all tasks switch from CREATED to RUNNING)

On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <> wrote:

> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
> I did another test yesterday. In this test, I intentionally throw
> exception from the source operator:
> ```
> if (runtimeContext.getIndexOfThisSubtask() == 1
>         && errorFrenquecyInMin > 0
>         && System.currentTimeMillis() - lastStartTime >=
> errorFrenquecyInMin * 60 * 1000) {
>       lastStartTime = System.currentTimeMillis();
>       throw new RuntimeException(
>           "Trigger expected exception at: " + lastStartTime);
>     }
> ```
> In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
> 1s (because no need for container allocation).
> Some logs:
> ```
> ```
> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <> wrote:
>> A quick addition, I think with FLINK-23202 it should now also be possible
>> to improve the heartbeat mechanism in the general case. We can leverage the
>> unreachability exception thrown if a remote target is no longer reachable
>> to mark an heartbeat target as no longer reachable [1]. This can then be
>> considered as if the heartbeat timeout has been triggered. That way we
>> should detect lost TaskExecutors as fast as our heartbeat interval is.
>> [1]
>> Cheers,
>> Till
>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <> wrote:
>>> Since you are deploying Flink workloads on Yarn, the Flink
>>> ResourceManager should get the container
>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
>>> which is 8 seconds by default.
>>> And Flink ResourceManager will release the dead TaskManager container
>>> once received the completion event.
>>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>> I think most of the time cost in Phase 1 might be cancelling the tasks
>>> on the dead TaskManagers.
>>> Best,
>>> Yang
>>> Till Rohrmann <> 于2021年7月1日周四 下午4:49写道:
>>>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>>>> the primary means to detect dead TaskManagers. This means that Flink will
>>>> take at least `heartbeat.timeout` time before the system recovers. Even if
>>>> the cancellation happens fast (e.g. by having configured a low
>>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
>>>> TaskManager until it is marked as dead and its slots are released (unless
>>>> the ResourceManager does not get a signal from the underlying resource
>>>> management system that a container/pod has died). One way to improve the
>>>> situation is to introduce logic which can react to a ConnectionException
>>>> and then black lists or releases a TaskManager, for example. This is
>>>> currently not implemented in Flink, though.
>>>> Concerning the cancellation operation: Flink currently does not listen
>>>> to the dead letters of Akka. This means that the `akka.ask.timeout` is the
>>>> primary means to fail the future result of a rpc which could not be sent.
>>>> This is also an improvement we should add to Flink's RpcService. I've
>>>> created a JIRA issue for this problem [1].
>>>> [1]
>>>> Cheers,
>>>> Till
>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <> wrote:
>>>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>>> Best
>>>>> Lu
>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <> wrote:
>>>>>> I'm also wondering here.
>>>>>> In my opinion, it's because the JM can not confirm whether the TM is
>>>>>> lost or it's a temporary network trouble and will recover soon, since I 
>>>>>> can
>>>>>> see in the log that akka has got a Connection refused but JM still sends 
>>>>>> a
>>>>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But
>>>>>> I'm not sure if it's indeed designed like this.
>>>>>> I would really appreciate it if anyone who knows more details could
>>>>>> answer. Thanks.

Reply via email to