Hi Ken,
as you can see here [1], Flink interrupts the timer service after a
certain timeout. If you want to get rid of the exception, you should
increase "task.cancellation.timers.timeout" in the configuration.
Actually, the default is already set to 7 seconds. So your exception
should not be thrown so quickly. For me this looks like a bug but please
let us know if setting the timeout higher solved your problem.
Regards,
Timo
[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L358
Am 21.03.18 um 23:29 schrieb Ken Krugler:
Hi all,
When I cancel a job that has async functions, I see this sequence in
the TaskManager logs:
2018-03-21 14:51:34,471 INFO
org.apache.flink.runtime.taskmanager.Task -
Attempting to cancel task AsyncFunctionName (1/1)
(fcb7bbe7cd89f1167f8a656b0f2fdaf9).
2018-03-21 14:51:34,471 INFO
org.apache.flink.runtime.taskmanager.Task
- AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched
from RUNNING to CANCELING.
2018-03-21 14:51:34,471 INFO
org.apache.flink.runtime.taskmanager.Task -
Triggering cancellation of task code AsyncFunctionName (1/1)
(fcb7bbe7cd89f1167f8a656b0f2fdaf9).
And then less than a second later...
2018-03-21 14:51:35,315 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask - Could
not shut down timer service
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
at
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:747)
Followed shortly thereafter by a call to the async function’s close()
method, which logs:
2018-03-21 14:51:35,334 DEBUG
com.scaleunlimited.utils.ThreadedExecutor - Shutting
down the AsyncFunctionName thread pool
And finally…
2018-03-21 14:51:35,340 INFO
org.apache.flink.runtime.taskmanager.Task
- AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched
from CANCELING to CANCELED.
2018-03-21 14:51:35,340 INFO
org.apache.flink.runtime.taskmanager.Task -
Freeing task resources for AsyncFunctionName (1/1)
(fcb7bbe7cd89f1167f8a656b0f2fdaf9).
I’ve looked through the code, and I don’t see any place where I’m
interrupting any threads. When I shut down my own thread pool,
interrupts will be generated, but only for threads used by my pool,
and this happens after the InterruptedException.
Is this a known issue? Or is there something I can to do to avoid it?
Thanks,
— Ken
--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr