[ https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15569025#comment-15569025 ]
Shannon Carey commented on FLINK-4803: -------------------------------------- Yes, that's right. cancel() blocks on close(), and therefore if close() misbehaves the thread is never interrupted and cancel() blocks forever. In the issue description, I suggested your option #2. I think you'll want #1 no matter what. However, #2 allows for at least one message and/or exception to be logged that tells the user what went wrong (why their job is taking a long time to cancel, or why it did not cancel gracefully). I'm not sure what your DataSink-specific option would look like. Maybe it is similar to my workaround, where I wrapped my HadoopOutputFormat in a subclass that calls super.close() from a separate thread with a timeout? That workaround is ok, but I had to expend a fair amount of effort to figure out what the problem was, and also there was nothing I could do but restart Flink in order to get my job to terminate (not a desirable solution). You'll want Flink to function smoothly regardless of what data sink the user chooses. > Job Cancel can hang forever waiting for OutputFormat.close() > ------------------------------------------------------------ > > Key: FLINK-4803 > URL: https://issues.apache.org/jira/browse/FLINK-4803 > Project: Flink > Issue Type: Bug > Affects Versions: 1.1.1 > Reporter: Shannon Carey > > If the Flink job uses a badly-behaved OutputFormat (in this example, a > HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() > method blocks forever, it is impossible to cancel the Flink job even though > the blocked thread would respond to an interrupt. The stack traces below show > the state of the important threads when a job is canceled and the > OutputFormat is blocking forever inside of close(). > I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on > `this.format.close()`. When the timeout is reached, the Task thread should be > interrupted. > {code} > "Canceler for DataSink > (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" > #6422 daemon prio=5 os_prio=0 tid=0x00007fb7e42f0000 nid=0x34f3 waiting for > monitor entry [0x00007fb7be079000] > java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) > - waiting to lock <0x00000006bae5f788> (a java.lang.Object) > at > org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149) > at java.lang.Thread.run(Thread.java:745) > "DataSink > (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" > #6410 daemon prio=5 os_prio=0 tid=0x00007fb7e79a4800 nid=0x2ad8 waiting on > condition [0x00007fb7bdf78000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000006c5ab5e20> (a > java.util.concurrent.SynchronousQueue$TransferStack) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at > java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) > at > java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362) > at > java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895) > at > org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194) > at > org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180) > at > org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156) > at > org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275) > at > org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133) > at > org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126) > at > org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) > - locked <0x00000006bae5f788> (a java.lang.Object) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)