LadyForest opened a new pull request, #21802:
URL: https://github.com/apache/flink/pull/21802

   ## What is the purpose of the change
   
   This pull request aims to fix the `STOP JOB <job_identifier>` cannot stop 
the job correctly. This issue is detected when reviewing 
https://github.com/apache/flink/pull/21717.
   
   **Phenomenon** 
   `STOP JOB <job_identifier>` occasionally not working as expected, and the 
following exception stacktrace can be found in the sql-client.log
   ```text
   2023-01-30 23:40:16,456 WARN  
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel [] - 
Force-closing a channel whose registration task was not accepted by an event 
loop: [id: 0xe1c04c6e]
   java.util.concurrent.RejectedExecutionException: event executor terminated
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:89)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:83)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:471) 
~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:394) 
~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:308) 
~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$38(RestClusterClient.java:962)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) 
[?:1.8.0_202]
        at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 [?:1.8.0_202]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
[?:1.8.0_202]
        at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) 
[?:1.8.0_202]
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:580)
 [?:1.8.0_202]
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 [?:1.8.0_202]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_202]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_202]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
   2023-01-30 23:40:16,464 ERROR 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
 [] - Failed to submit a listener notification task. Event loop shut down?
   java.util.concurrent.RejectedExecutionException: event executor terminated
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:499)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:475) 
~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:394) 
~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:308) 
~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$38(RestClusterClient.java:962)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) 
[?:1.8.0_202]
        at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 [?:1.8.0_202]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
[?:1.8.0_202]
        at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) 
[?:1.8.0_202]
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:580)
 [?:1.8.0_202]
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 [?:1.8.0_202]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_202]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_202]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
   ```
   **Root cause**
   `OperationExecutor#runClusterAction` manages `RestClusterClient` lifecycle 
via `try-with-resource`; thus, the client will be closed immediately after 
leaving the code block, which shuts down the netty `NioEventLoopGroup`. 
   
   ## Brief change log
   
   Make `#cancel`blocked until timeout, the same as `#stopWithSavepoint`.
   
   
   ## Verifying this change
   
   The current test does not cover the condition for`stop job`, so first, make 
it parameterized. The issue can be reproduced by rolling back the changes made 
on the production files and then running the test, `stop job` will get stuck 
forever.
   
   The fix can be verified by applying the fix and running the test again.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): No
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: No
     - The serializers: No
     - The runtime per-record code paths (performance sensitive): No
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No
     - The S3 file system connector: No
   
   ## Documentation
     - Does this pull request introduces a new feature? No
     - If yes, how is the feature documented? Not Applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to