LadyForest opened a new pull request, #21802: URL: https://github.com/apache/flink/pull/21802
## What is the purpose of the change This pull request aims to fix the `STOP JOB <job_identifier>` cannot stop the job correctly. This issue is detected when reviewing https://github.com/apache/flink/pull/21717. **Phenomenon** `STOP JOB <job_identifier>` occasionally not working as expected, and the following exception stacktrace can be found in the sql-client.log ```text 2023-01-30 23:40:16,456 WARN org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel [] - Force-closing a channel whose registration task was not accepted by an event loop: [id: 0xe1c04c6e] java.util.concurrent.RejectedExecutionException: event executor terminated at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:89) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:83) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:471) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:394) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:308) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.client.program.rest.RestClusterClient.lambda$null$38(RestClusterClient.java:962) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) [?:1.8.0_202] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) [?:1.8.0_202] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) [?:1.8.0_202] at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) [?:1.8.0_202] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:580) [?:1.8.0_202] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) [?:1.8.0_202] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202] 2023-01-30 23:40:16,464 ERROR org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution [] - Failed to submit a listener notification task. Event loop shut down? java.util.concurrent.RejectedExecutionException: event executor terminated at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:499) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:475) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:394) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:308) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.client.program.rest.RestClusterClient.lambda$null$38(RestClusterClient.java:962) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) [?:1.8.0_202] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) [?:1.8.0_202] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) [?:1.8.0_202] at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) [?:1.8.0_202] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:580) [?:1.8.0_202] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) [?:1.8.0_202] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202] ``` **Root cause** `OperationExecutor#runClusterAction` manages `RestClusterClient` lifecycle via `try-with-resource`; thus, the client will be closed immediately after leaving the code block, which shuts down the netty `NioEventLoopGroup`. ## Brief change log Make `#cancel`blocked until timeout, the same as `#stopWithSavepoint`. ## Verifying this change The current test does not cover the condition for`stop job`, so first, make it parameterized. The issue can be reproduced by rolling back the changes made on the production files and then running the test, `stop job` will get stuck forever. The fix can be verified by applying the fix and running the test again. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): No - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: No - The serializers: No - The runtime per-record code paths (performance sensitive): No - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No - The S3 file system connector: No ## Documentation - Does this pull request introduces a new feature? No - If yes, how is the feature documented? Not Applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org