Hi all,

When I cancel a job that has async functions, I see this sequence in the 
TaskManager logs:

2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Attempting to cancel task AsyncFunctionName (1/1) 
(fcb7bbe7cd89f1167f8a656b0f2fdaf9).
2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task         
            - AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) 
switched from RUNNING to CANCELING.
2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Triggering cancellation of task code AsyncFunctionName (1/1) 
(fcb7bbe7cd89f1167f8a656b0f2fdaf9).

And then less than a second later...

2018-03-21 14:51:35,315 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not shut 
down timer service
java.lang.InterruptedException
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
        at 
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:747)

Followed shortly thereafter by a call to the async function’s close() method, 
which logs:

2018-03-21 14:51:35,334 DEBUG com.scaleunlimited.utils.ThreadedExecutor         
         - Shutting down the AsyncFunctionName thread pool 

And finally…

2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task         
            - AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) 
switched from CANCELING to CANCELED.
2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Freeing task resources for AsyncFunctionName (1/1) 
(fcb7bbe7cd89f1167f8a656b0f2fdaf9).

I’ve looked through the code, and I don’t see any place where I’m interrupting 
any threads. When I shut down my own thread pool, interrupts will be generated, 
but only for threads used by my pool, and this happens after the 
InterruptedException.

Is this a known issue? Or is there something I can to do to avoid it?

Thanks,

— Ken 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply via email to