Hi all,
When I cancel a job that has async functions, I see this sequence in the
TaskManager logs:
2018-03-21 14:51:34,471 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task AsyncFunctionName (1/1)
(fcb7bbe7cd89f1167f8a656b0f2fdaf9).
2018-03-21 14:51:34,471 INFO org.apache.flink.runtime.taskmanager.Task
- AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9)
switched from RUNNING to CANCELING.
2018-03-21 14:51:34,471 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code AsyncFunctionName (1/1)
(fcb7bbe7cd89f1167f8a656b0f2fdaf9).
And then less than a second later...
2018-03-21 14:51:35,315 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut
down timer service
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
at
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:747)
Followed shortly thereafter by a call to the async function’s close() method,
which logs:
2018-03-21 14:51:35,334 DEBUG com.scaleunlimited.utils.ThreadedExecutor
- Shutting down the AsyncFunctionName thread pool
And finally…
2018-03-21 14:51:35,340 INFO org.apache.flink.runtime.taskmanager.Task
- AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9)
switched from CANCELING to CANCELED.
2018-03-21 14:51:35,340 INFO org.apache.flink.runtime.taskmanager.Task
- Freeing task resources for AsyncFunctionName (1/1)
(fcb7bbe7cd89f1167f8a656b0f2fdaf9).
I’ve looked through the code, and I don’t see any place where I’m interrupting
any threads. When I shut down my own thread pool, interrupts will be generated,
but only for threads used by my pool, and this happens after the
InterruptedException.
Is this a known issue? Or is there something I can to do to avoid it?
Thanks,
— Ken
--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr