Hi Ken,

as you can see here [1], Flink interrupts the timer service after a certain timeout. If you want to get rid of the exception, you should increase "task.cancellation.timers.timeout" in the configuration.

Actually, the default is already set to 7 seconds. So your exception should not be thrown so quickly. For me this looks like a bug but please let us know if setting the timeout higher solved your problem.

Regards,
Timo


[1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L358


Am 21.03.18 um 23:29 schrieb Ken Krugler:
Hi all,

When I cancel a job that has async functions, I see this sequence in the TaskManager logs:

2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9). 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task - AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched from RUNNING to CANCELING. 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9).

And then less than a second later...

2018-03-21 14:51:35,315 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not shut down timer service
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:747)

Followed shortly thereafter by a call to the async function’s close() method, which logs:

2018-03-21 14:51:35,334 DEBUG com.scaleunlimited.utils.ThreadedExecutor                  - Shutting down the AsyncFunctionName thread pool

And finally…

2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task - AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched from CANCELING to CANCELED. 2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9).

I’ve looked through the code, and I don’t see any place where I’m interrupting any threads. When I shut down my own thread pool, interrupts will be generated, but only for threads used by my pool, and this happens after the InterruptedException.

Is this a known issue? Or is there something I can to do to avoid it?

Thanks,

— Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Reply via email to