Yes you are right that `thenAcceptAsync` only breaks the control flow but it does not guarantee that the `RestServer` has actually sent the response to the client. Maybe we also need something similar to FLINK-10309 [1]. The problem I see with this approach is that it makes all RestHandlers stateful.
[1] https://issues.apache.org/jira/browse/FLINK-10309 Cheers, Till On Fri, Mar 20, 2020 at 2:26 PM DONG, Weike <kyled...@connect.hku.hk> wrote: > Hi Tison & Till, > > Changing *thenAccept *into *thenAcceptAsync *in the > MiniDispatcher#cancelJob does not help to solve the problem in my > environment. However, I have found that adding a* Thread.sleep(2000) *before > the return of JobCancellationHandler#handleRequest solved the problem (at > least the symptom goes away). As this is only a dirty hack, I will try to > get a more decent solution to this problem. > > Sincerely, > Weike > > On Tue, Mar 17, 2020 at 11:11 PM tison <wander4...@gmail.com> wrote: > >> JIRA created as https://jira.apache.org/jira/browse/FLINK-16637 >> >> Best, >> tison. >> >> >> Till Rohrmann <trohrm...@apache.org> 于2020年3月17日周二 下午5:57写道: >> >>> @Tison could you create an issue to track the problem. Please also link >>> the uploaded log file for further debugging. >>> >>> I think the reason why it worked in Flink 1.9 could have been that we >>> had a async callback in the longer chain which broke the flow of execution >>> and allowed to send the response. This is no longer the case. As an easy >>> fix one could change thenAccept into thenAcceptAsync in the >>> MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should >>> think about allowing not only StatusHandler to close asynchronously. At the >>> moment we say that all other handler shut down immediately (see >>> AbstractHandler#closeHandlerAsync). But the problem with this change would >>> be that all handler would become stateful because they would need to >>> remember whether a request is currently ongoing or not. >>> >>> Cheers, >>> Till >>> >>> On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike <kyled...@connect.hku.hk> >>> wrote: >>> >>>> Hi Tison & Till and all, >>>> >>>> I have uploaded the client, taskmanager and jobmanager log to Gist ( >>>> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), >>>> and I >>>> can reproduce this bug every time when trying to cancel Flink 1.10 jobs >>>> on >>>> YARN. >>>> >>>> Besides, in earlier Flink versions like 1.9, the REST API for >>>> *cancelling >>>> job with a savepoint *sometimes throws exceptions to the client side >>>> due to >>>> early shutdown of the server, even though the savepoint was successfully >>>> completed by reviewing the log, however when using the newly introduced >>>> *stop* API, that bug disappeared, however, *cancel* API seems to be >>>> buggy >>>> now. >>>> >>>> Best, >>>> Weike >>>> >>>> On Tue, Mar 17, 2020 at 10:17 AM tison <wander4...@gmail.com> wrote: >>>> >>>> > edit: previously after the cancellation we have a longer call chain to >>>> > #jobReachedGloballyTerminalState which does the archive job & JM >>>> graceful >>>> > showdown, which might take some time so that ... >>>> > >>>> > Best, >>>> > tison. >>>> > >>>> > >>>> > tison <wander4...@gmail.com> 于2020年3月17日周二 上午10:13写道: >>>> > >>>> >> Hi Weike & Till, >>>> >> >>>> >> I agree with Till and it is also the analysis from my side. However, >>>> it >>>> >> seems even if we don't have FLINK-15116, it is still possible that we >>>> >> complete the cancel future but the cluster got shutdown before it >>>> properly >>>> >> delivered the response. >>>> >> >>>> >> There is one thing strange that this behavior almost reproducible, it >>>> >> should be a possible order but not always. Maybe previous we have to >>>> >> firstly cancel the job which has a long call chain so that it >>>> happens we >>>> >> have enough time to delivered the response. >>>> >> >>>> >> But the resolution looks like we introduce some >>>> >> synchronization/finalization logics that clear these outstanding >>>> future >>>> >> with best effort before the cluster(RestServer) down. >>>> >> >>>> >> Best, >>>> >> tison. >>>> >> >>>> >> >>>> >> Till Rohrmann <trohrm...@apache.org> 于2020年3月17日周二 上午4:12写道: >>>> >> >>>> >>> Hi Weike, >>>> >>> >>>> >>> could you share the complete logs with us? Attachments are being >>>> >>> filtered out by the Apache mail server but it works if you upload >>>> the logs >>>> >>> somewhere (e.g. https://gist.github.com/) and then share the link >>>> with >>>> >>> us. Ideally you run the cluster with DEBUG log settings. >>>> >>> >>>> >>> I assume that you are running Flink 1.10, right? >>>> >>> >>>> >>> My suspicion is that this behaviour has been introduced with >>>> FLINK-15116 >>>> >>> [1]. It looks as if we complete the shutdown future in >>>> >>> MiniDispatcher#cancelJob before we return the response to the >>>> >>> RestClusterClient. My guess is that this triggers the shutdown of >>>> the >>>> >>> RestServer which then is not able to serve the response to the >>>> client. I'm >>>> >>> pulling in Aljoscha and Tison who introduced this change. They >>>> might be >>>> >>> able to verify my theory and propose a solution for it. >>>> >>> >>>> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116 >>>> >>> >>>> >>> Cheers, >>>> >>> Till >>>> >>> >>>> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike < >>>> kyled...@connect.hku.hk> >>>> >>> wrote: >>>> >>> >>>> >>>> Hi Yangze and all, >>>> >>>> >>>> >>>> I have tried numerous times, and this behavior persists. >>>> >>>> >>>> >>>> Below is the tail log of taskmanager.log: >>>> >>>> >>>> >>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] >>>> INFO >>>> >>>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - >>>> Free slot >>>> >>>> TaskSlot(index:0, state:ACTIVE, resource profile: >>>> >>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb >>>> >>>> (1613968148 bytes), taskOffHeapMemory=0 bytes, >>>> managedMemory=1.403gb >>>> >>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)}, >>>> >>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId: >>>> >>>> d0a674795be98bd2574d9ea3286801cb). >>>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] >>>> INFO >>>> >>>> org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove >>>> job >>>> >>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring. >>>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] >>>> INFO >>>> >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close >>>> JobManager >>>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb. >>>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] >>>> INFO >>>> >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close >>>> JobManager >>>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb. >>>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] >>>> INFO >>>> >>>> org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot >>>> reconnect >>>> >>>> to job d0a674795be98bd2574d9ea3286801cb because it is not >>>> registered. >>>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO >>>> >>>> org.apache.flink.yarn.YarnTaskExecutorRunner - RECEIVED SIGNAL >>>> 15: >>>> >>>> SIGTERM. Shutting down as requested. >>>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO >>>> >>>> org.apache.flink.yarn.YarnTaskExecutorRunner - RECEIVED SIGNAL >>>> 15: >>>> >>>> SIGTERM. Shutting down as requested. >>>> >>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO >>>> >>>> org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down >>>> BLOB >>>> >>>> cache >>>> >>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle >>>> shutdown >>>> >>>> hook] INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl >>>> - >>>> >>>> FileChannelManager removed spill file directory >>>> >>>> >>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d >>>> >>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager >>>> shutdown >>>> >>>> hook] INFO >>>> >>>> >>>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - >>>> >>>> Shutting down TaskExecutorLocalStateStoresManager. >>>> >>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO >>>> >>>> org.apache.flink.runtime.blob.TransientBlobCache - Shutting down >>>> BLOB >>>> >>>> cache >>>> >>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] >>>> INFO >>>> >>>> org.apache.flink.runtime.io.disk.FileChannelManagerImpl - >>>> >>>> FileChannelManager removed spill file directory >>>> >>>> >>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd >>>> >>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO >>>> >>>> org.apache.flink.runtime.filecache.FileCache - removed file cache >>>> >>>> directory >>>> >>>> >>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31 >>>> >>>> >>>> >>>> As the tail log of jobmanager.log is kind of lengthy, I have >>>> attached >>>> >>>> it in this mail. >>>> >>>> >>>> >>>> From what I have seen, the TaskManager and JobManager shut down by >>>> >>>> themselves, however, I have noticed some Netty exceptions (from >>>> the stack >>>> >>>> trace, it is part of the REST handler) like: >>>> >>>> >>>> >>>> ERROR >>>> >>>> >>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution >>>> >>>> - Failed to submit a listener notification task. Event loop shut >>>> down? >>>> >>>> java.util.concurrent.RejectedExecutionException: event executor >>>> >>>> terminated >>>> >>>> >>>> >>>> Thus I suppose that these exceptions might be the actual cause of >>>> >>>> premature termination of the REST server, and I am still looking >>>> into the >>>> >>>> real cause of this. >>>> >>>> >>>> >>>> Best, >>>> >>>> Weike >>>> >>>> >>>> >>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <karma...@gmail.com> >>>> wrote: >>>> >>>> >>>> >>>>> Would you mind to share more information about why the task >>>> executor >>>> >>>>> is killed? If it is killed by Yarn, you might get such info in >>>> Yarn >>>> >>>>> NM/RM logs. >>>> >>>>> >>>> >>>>> Best, >>>> >>>>> Yangze Guo >>>> >>>>> >>>> >>>>> Best, >>>> >>>>> Yangze Guo >>>> >>>>> >>>> >>>>> >>>> >>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike < >>>> kyled...@connect.hku.hk> >>>> >>>>> wrote: >>>> >>>>> > >>>> >>>>> > Hi, >>>> >>>>> > >>>> >>>>> > Recently I have encountered a strange behavior of Flink on YARN, >>>> >>>>> which is that when I try to cancel a Flink job running in per-job >>>> mode on >>>> >>>>> YARN using commands like >>>> >>>>> > >>>> >>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412 >>>> >>>>> ed7e2e0ab0a7316c1b65df6047bc6aae" >>>> >>>>> > >>>> >>>>> > the client happily found and connected to ResourceManager and >>>> then >>>> >>>>> stucks at >>>> >>>>> > Found Web Interface 172.28.28.3:50099 of application >>>> >>>>> 'application_1559388106022_9412'. >>>> >>>>> > >>>> >>>>> > And after one minute, an exception is thrown at the client side: >>>> >>>>> > Caused by: org.apache.flink.util.FlinkException: Could not >>>> cancel >>>> >>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae. >>>> >>>>> > at >>>> >>>>> >>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) >>>> >>>>> > at >>>> >>>>> >>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) >>>> >>>>> > at >>>> >>>>> >>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) >>>> >>>>> > at >>>> >>>>> >>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917) >>>> >>>>> > at >>>> >>>>> >>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988) >>>> >>>>> > at java.security.AccessController.doPrivileged(Native >>>> Method) >>>> >>>>> > at javax.security.auth.Subject.doAs(Subject.java:422) >>>> >>>>> > at >>>> >>>>> >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) >>>> >>>>> > ... 20 more >>>> >>>>> > Caused by: java.util.concurrent.TimeoutException >>>> >>>>> > at >>>> >>>>> >>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >>>> >>>>> > at >>>> >>>>> >>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >>>> >>>>> > at >>>> >>>>> >>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) >>>> >>>>> > ... 27 more >>>> >>>>> > >>>> >>>>> > Then I discovered that the YARN app has already terminated with >>>> >>>>> FINISHED state and KILLED final status, like below. >>>> >>>>> > >>>> >>>>> > And after digging into the log of this finished YARN app, I have >>>> >>>>> found that TaskManager had already received the SIGTERM signal and >>>> >>>>> terminated gracefully. >>>> >>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner - RECEIVED SIGNAL >>>> 15: >>>> >>>>> SIGTERM. Shutting down as requested. >>>> >>>>> > >>>> >>>>> > Also, the log of JobManager shows that it terminated with exit >>>> code >>>> >>>>> 0. >>>> >>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint >>>> with >>>> >>>>> exit code 0 >>>> >>>>> > >>>> >>>>> > However, the JobManager did not return anything to the client >>>> before >>>> >>>>> its shutdown, which is different from previous versions (like >>>> Flink 1.9). >>>> >>>>> > >>>> >>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn >>>> >>>>> module? >>>> >>>>> > >>>> >>>>> > Thank you : ) >>>> >>>>> > >>>> >>>>> > Sincerely, >>>> >>>>> > Weike >>>> >>>>> >>>> >>>> >>>> >>>