Hi all, When I cancel a job that has async functions, I see this sequence in the TaskManager logs:
2018-03-21 14:51:34,471 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9). 2018-03-21 14:51:34,471 INFO org.apache.flink.runtime.taskmanager.Task - AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched from RUNNING to CANCELING. 2018-03-21 14:51:34,471 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9). And then less than a second later... 2018-03-21 14:51:35,315 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut down timer service java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:747) Followed shortly thereafter by a call to the async function’s close() method, which logs: 2018-03-21 14:51:35,334 DEBUG com.scaleunlimited.utils.ThreadedExecutor - Shutting down the AsyncFunctionName thread pool And finally… 2018-03-21 14:51:35,340 INFO org.apache.flink.runtime.taskmanager.Task - AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched from CANCELING to CANCELED. 2018-03-21 14:51:35,340 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9). I’ve looked through the code, and I don’t see any place where I’m interrupting any threads. When I shut down my own thread pool, interrupts will be generated, but only for threads used by my pool, and this happens after the InterruptedException. Is this a known issue? Or is there something I can to do to avoid it? Thanks, — Ken -------------------------- Ken Krugler http://www.scaleunlimited.com custom big data solutions & training Hadoop, Cascading, Cassandra & Solr