Yes you are right that `thenAcceptAsync` only breaks the control flow but
it does not guarantee that the `RestServer` has actually sent the response
to the client. Maybe we also need something similar to FLINK-10309 [1]. The
problem I see with this approach is that it makes all RestHandlers stateful.

[1] https://issues.apache.org/jira/browse/FLINK-10309

Cheers,
Till

On Fri, Mar 20, 2020 at 2:26 PM DONG, Weike <kyled...@connect.hku.hk> wrote:

> Hi Tison & Till,
>
> Changing *thenAccept *into *thenAcceptAsync *in the
> MiniDispatcher#cancelJob does not help to solve the problem in my
> environment. However, I have found that adding a* Thread.sleep(2000) *before
> the return of JobCancellationHandler#handleRequest solved the problem (at
> least the symptom goes away). As this is only a dirty hack, I will try to
> get a more decent solution to this problem.
>
> Sincerely,
> Weike
>
> On Tue, Mar 17, 2020 at 11:11 PM tison <wander4...@gmail.com> wrote:
>
>> JIRA created as https://jira.apache.org/jira/browse/FLINK-16637
>>
>> Best,
>> tison.
>>
>>
>> Till Rohrmann <trohrm...@apache.org> 于2020年3月17日周二 下午5:57写道:
>>
>>>  @Tison could you create an issue to track the problem. Please also link
>>> the uploaded log file for further debugging.
>>>
>>> I think the reason why it worked in Flink 1.9 could have been that we
>>> had a async callback in the longer chain which broke the flow of execution
>>> and allowed to send the response. This is no longer the case. As an easy
>>> fix one could change thenAccept into thenAcceptAsync in the
>>> MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
>>> think about allowing not only StatusHandler to close asynchronously. At the
>>> moment we say that all other handler shut down immediately (see
>>> AbstractHandler#closeHandlerAsync). But the problem with this change would
>>> be that all handler would become stateful because they would need to
>>> remember whether a request is currently ongoing or not.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike <kyled...@connect.hku.hk>
>>> wrote:
>>>
>>>> Hi Tison & Till and all,
>>>>
>>>> I have uploaded the client, taskmanager and jobmanager log to Gist (
>>>> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f),
>>>> and I
>>>> can reproduce this bug every time when trying to cancel Flink 1.10 jobs
>>>> on
>>>> YARN.
>>>>
>>>> Besides, in earlier Flink versions like 1.9, the REST API for
>>>> *cancelling
>>>> job with a savepoint *sometimes throws exceptions to the client side
>>>> due to
>>>> early shutdown of the server, even though the savepoint was successfully
>>>> completed by reviewing the log, however when using the newly introduced
>>>> *stop* API, that bug disappeared, however, *cancel* API seems to be
>>>> buggy
>>>> now.
>>>>
>>>> Best,
>>>> Weike
>>>>
>>>> On Tue, Mar 17, 2020 at 10:17 AM tison <wander4...@gmail.com> wrote:
>>>>
>>>> > edit: previously after the cancellation we have a longer call chain to
>>>> > #jobReachedGloballyTerminalState which does the archive job & JM
>>>> graceful
>>>> > showdown, which might take some time so that ...
>>>> >
>>>> > Best,
>>>> > tison.
>>>> >
>>>> >
>>>> > tison <wander4...@gmail.com> 于2020年3月17日周二 上午10:13写道:
>>>> >
>>>> >> Hi Weike & Till,
>>>> >>
>>>> >> I agree with Till and it is also the analysis from my side. However,
>>>> it
>>>> >> seems even if we don't have FLINK-15116, it is still possible that we
>>>> >> complete the cancel future but the cluster got shutdown before it
>>>> properly
>>>> >> delivered the response.
>>>> >>
>>>> >> There is one thing strange that this behavior almost reproducible, it
>>>> >> should be a possible order but not always. Maybe previous we have to
>>>> >> firstly cancel the job which has a long call chain so that it
>>>> happens we
>>>> >> have enough time to delivered the response.
>>>> >>
>>>> >> But the resolution looks like we introduce some
>>>> >> synchronization/finalization logics that clear these outstanding
>>>> future
>>>> >> with best effort before the cluster(RestServer) down.
>>>> >>
>>>> >> Best,
>>>> >> tison.
>>>> >>
>>>> >>
>>>> >> Till Rohrmann <trohrm...@apache.org> 于2020年3月17日周二 上午4:12写道:
>>>> >>
>>>> >>> Hi Weike,
>>>> >>>
>>>> >>> could you share the complete logs with us? Attachments are being
>>>> >>> filtered out by the Apache mail server but it works if you upload
>>>> the logs
>>>> >>> somewhere (e.g. https://gist.github.com/) and then share the link
>>>> with
>>>> >>> us. Ideally you run the cluster with DEBUG log settings.
>>>> >>>
>>>> >>> I assume that you are running Flink 1.10, right?
>>>> >>>
>>>> >>> My suspicion is that this behaviour has been introduced with
>>>> FLINK-15116
>>>> >>> [1]. It looks as if we complete the shutdown future in
>>>> >>> MiniDispatcher#cancelJob before we return the response to the
>>>> >>> RestClusterClient. My guess is that this triggers the shutdown of
>>>> the
>>>> >>> RestServer which then is not able to serve the response to the
>>>> client. I'm
>>>> >>> pulling in Aljoscha and Tison who introduced this change. They
>>>> might be
>>>> >>> able to verify my theory and propose a solution for it.
>>>> >>>
>>>> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>>> >>>
>>>> >>> Cheers,
>>>> >>> Till
>>>> >>>
>>>> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <
>>>> kyled...@connect.hku.hk>
>>>> >>> wrote:
>>>> >>>
>>>> >>>> Hi Yangze and all,
>>>> >>>>
>>>> >>>> I have tried numerous times, and this behavior persists.
>>>> >>>>
>>>> >>>> Below is the tail log of taskmanager.log:
>>>> >>>>
>>>> >>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  -
>>>> Free slot
>>>> >>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>>>> >>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>>>> >>>> (1613968148 bytes), taskOffHeapMemory=0 bytes,
>>>> managedMemory=1.403gb
>>>> >>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>>>> >>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>>> >>>> d0a674795be98bd2574d9ea3286801cb).
>>>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove
>>>> job
>>>> >>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>>>> JobManager
>>>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>>>> JobManager
>>>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot
>>>> reconnect
>>>> >>>> to job d0a674795be98bd2574d9ea3286801cb because it is not
>>>> registered.
>>>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>>> 15:
>>>> >>>> SIGTERM. Shutting down as requested.
>>>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>>> 15:
>>>> >>>> SIGTERM. Shutting down as requested.
>>>> >>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>>> >>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down
>>>> BLOB
>>>> >>>> cache
>>>> >>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle
>>>> shutdown
>>>> >>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl
>>>> -
>>>> >>>> FileChannelManager removed spill file directory
>>>> >>>>
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>>>> >>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager
>>>> shutdown
>>>> >>>> hook] INFO
>>>> >>>>
>>>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>>> >>>> Shutting down TaskExecutorLocalStateStoresManager.
>>>> >>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>>> >>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down
>>>> BLOB
>>>> >>>> cache
>>>> >>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>>> >>>> FileChannelManager removed spill file directory
>>>> >>>>
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>>>> >>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>>> >>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>>>> >>>> directory
>>>> >>>>
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>>> >>>>
>>>> >>>> As the tail log of jobmanager.log is kind of lengthy, I have
>>>> attached
>>>> >>>> it in this mail.
>>>> >>>>
>>>> >>>> From what I have seen, the TaskManager and JobManager shut down by
>>>> >>>> themselves, however, I have noticed some Netty exceptions (from
>>>> the stack
>>>> >>>> trace, it is part of the REST handler) like:
>>>> >>>>
>>>> >>>> ERROR
>>>> >>>>
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>> >>>>  - Failed to submit a listener notification task. Event loop shut
>>>> down?
>>>> >>>> java.util.concurrent.RejectedExecutionException: event executor
>>>> >>>> terminated
>>>> >>>>
>>>> >>>> Thus I suppose that these exceptions might be the actual cause of
>>>> >>>> premature termination of the REST server, and I am still looking
>>>> into the
>>>> >>>> real cause of this.
>>>> >>>>
>>>> >>>> Best,
>>>> >>>> Weike
>>>> >>>>
>>>> >>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <karma...@gmail.com>
>>>> wrote:
>>>> >>>>
>>>> >>>>> Would you mind to share more information about why the task
>>>> executor
>>>> >>>>> is killed? If it is killed by Yarn, you might get such info in
>>>> Yarn
>>>> >>>>> NM/RM logs.
>>>> >>>>>
>>>> >>>>> Best,
>>>> >>>>> Yangze Guo
>>>> >>>>>
>>>> >>>>> Best,
>>>> >>>>> Yangze Guo
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <
>>>> kyled...@connect.hku.hk>
>>>> >>>>> wrote:
>>>> >>>>> >
>>>> >>>>> > Hi,
>>>> >>>>> >
>>>> >>>>> > Recently I have encountered a strange behavior of Flink on YARN,
>>>> >>>>> which is that when I try to cancel a Flink job running in per-job
>>>> mode on
>>>> >>>>> YARN using commands like
>>>> >>>>> >
>>>> >>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>>>> >>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>>> >>>>> >
>>>> >>>>> > the client happily found and connected to ResourceManager and
>>>> then
>>>> >>>>> stucks at
>>>> >>>>> > Found Web Interface 172.28.28.3:50099 of application
>>>> >>>>> 'application_1559388106022_9412'.
>>>> >>>>> >
>>>> >>>>> > And after one minute, an exception is thrown at the client side:
>>>> >>>>> > Caused by: org.apache.flink.util.FlinkException: Could not
>>>> cancel
>>>> >>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>>> >>>>> >     at java.security.AccessController.doPrivileged(Native
>>>> Method)
>>>> >>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>>> >>>>> >     ... 20 more
>>>> >>>>> > Caused by: java.util.concurrent.TimeoutException
>>>> >>>>> >     at
>>>> >>>>>
>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>> >>>>> >     at
>>>> >>>>>
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>>> >>>>> >     ... 27 more
>>>> >>>>> >
>>>> >>>>> > Then I discovered that the YARN app has already terminated with
>>>> >>>>> FINISHED state and KILLED final status, like below.
>>>> >>>>> >
>>>> >>>>> > And after digging into the log of this finished YARN app, I have
>>>> >>>>> found that TaskManager had already received the SIGTERM signal and
>>>> >>>>> terminated gracefully.
>>>> >>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>>> 15:
>>>> >>>>> SIGTERM. Shutting down as requested.
>>>> >>>>> >
>>>> >>>>> > Also, the log of JobManager shows that it terminated with exit
>>>> code
>>>> >>>>> 0.
>>>> >>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint
>>>> with
>>>> >>>>> exit code 0
>>>> >>>>> >
>>>> >>>>> > However, the JobManager did not return anything to the client
>>>> before
>>>> >>>>> its shutdown, which is different from previous versions (like
>>>> Flink 1.9).
>>>> >>>>> >
>>>> >>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>>>> >>>>> module?
>>>> >>>>> >
>>>> >>>>> > Thank you : )
>>>> >>>>> >
>>>> >>>>> > Sincerely,
>>>> >>>>> > Weike
>>>> >>>>>
>>>> >>>>
>>>>
>>>

Reply via email to