[ 
https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shannon Carey updated FLINK-4803:
---------------------------------
    Description: 
If the Flink job uses a badly-behaved OutputFormat (in this example, a 
HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() method 
blocks forever, it is impossible to cancel the Flink job even though the 
blocked thread would respond to an interrupt. The stack traces below show the 
state of the important threads when a job is canceled and the OutputFormat is 
blocking forever inside of close().

I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
`this.format.close()`. When the timeout is reached, the Task thread should be 
interrupted.

{code}
"Canceler for DataSink 
(org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
#6422 daemon prio=5 os_prio=0 tid=0x00007fb7e42f0000 nid=0x34f3 waiting for 
monitor entry [0x00007fb7be079000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at 
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
        - waiting to lock <0x00000006bae5f788> (a java.lang.Object)
        at 
org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
        at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
        at java.lang.Thread.run(Thread.java:745)

"DataSink 
(org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
#6410 daemon prio=5 os_prio=0 tid=0x00007fb7e79a4800 nid=0x2ad8 waiting on 
condition [0x00007fb7bdf78000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006c5ab5e20> (a 
java.util.concurrent.SynchronousQueue$TransferStack)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
        at 
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
        at 
java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
        at 
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
        at 
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
        at 
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
        at 
org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
        at 
org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
        at 
org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
        at 
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
        - locked <0x00000006bae5f788> (a java.lang.Object)
        at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
{code}

  was:
If the Flink job uses a badly-behaved OutputFormat (in this example, a 
HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() method 
blocks forever, it is impossible to cancel the Flink job even though the 
blocked thread would respond to an interrupt. The stack traces below show the 
state of the important threads when a job is canceled and the OutputFormat is 
blocking forever inside of close().

I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
`this.format.close()`. When the timeout is reached, the Task thread should be 
interrupted.

{code}
   java.lang.Thread.State: BLOCKED (on object monitor)
        at 
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
        - waiting to lock <0x00000006bae5f788> (a java.lang.Object)
        at 
org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
        at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
        at java.lang.Thread.run(Thread.java:745)

"DataSink 
(org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
#6410 daemon prio=5 os_prio=0 tid=0x00007fb7e79a4800 nid=0x2ad8 waiting on 
condition [0x00007fb7bdf78000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006c5ab5e20> (a 
java.util.concurrent.SynchronousQueue$TransferStack)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
        at 
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
        at 
java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
        at 
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
        at 
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
        at 
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
        at 
org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
        at 
org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
        at 
org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
        at 
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
        - locked <0x00000006bae5f788> (a java.lang.Object)
        at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
{code}


> Job Cancel can hang forever waiting for OutputFormat.close()
> ------------------------------------------------------------
>
>                 Key: FLINK-4803
>                 URL: https://issues.apache.org/jira/browse/FLINK-4803
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.1.1
>            Reporter: Shannon Carey
>
> If the Flink job uses a badly-behaved OutputFormat (in this example, a 
> HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() 
> method blocks forever, it is impossible to cancel the Flink job even though 
> the blocked thread would respond to an interrupt. The stack traces below show 
> the state of the important threads when a job is canceled and the 
> OutputFormat is blocking forever inside of close().
> I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
> `this.format.close()`. When the timeout is reached, the Task thread should be 
> interrupted.
> {code}
> "Canceler for DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6422 daemon prio=5 os_prio=0 tid=0x00007fb7e42f0000 nid=0x34f3 waiting for 
> monitor entry [0x00007fb7be079000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
>         - waiting to lock <0x00000006bae5f788> (a java.lang.Object)
>         at 
> org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
>         at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
>         at java.lang.Thread.run(Thread.java:745)
> "DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6410 daemon prio=5 os_prio=0 tid=0x00007fb7e79a4800 nid=0x2ad8 waiting on 
> condition [0x00007fb7bdf78000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000006c5ab5e20> (a 
> java.util.concurrent.SynchronousQueue$TransferStack)
>         at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>         at 
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
>         at 
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
>         at 
> java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
>         at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
>         at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
>         at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
>         at 
> org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
>         at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
>         at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
>         at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
>         - locked <0x00000006bae5f788> (a java.lang.Object)
>         at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to