Hi Wangsan,

what you are observing is the intended behaviour of Flink in the case of
cancelling a job. The assumption is that if a job is cancelled, then there
should be no clean shut down. This entails that the Tasks are interrupted
in order to terminate them as fast as possible.

Thus, the problem is that Flink, does not yet have a proper clean shut down
command like stop which gracefully drains the processing pipeline. The
community is thinking about it already for quite some time and we hope to
include it in one of the next releases.

Cheers,
Till

On Wed, Sep 27, 2017 at 8:55 AM, wangsan <wamg...@163.com> wrote:

> Hi all,
>
> We are currently using BucketingSink to save data into HDFS in parquet
> format. But when the flink job was canceled, we always got Exception in
> BucketingSink’s close method. The detailed exception info is as below:
> [ERROR] [2017-09-26 20:51:58,893] 
> [org.apache.flink.streaming.runtime.tasks.StreamTask]
> - Error during disposal of stream operator.
> java.io.InterruptedIOException: Interrupted while waiting for data to be
> acknowledged by pipeline
>    at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(
> DFSOutputStream.java:2151)
>    at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(
> DFSOutputStream.java:2130)
>    at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(
> DFSOutputStream.java:2266)
>    at org.apache.hadoop.hdfs.DFSOutputStream.close(
> DFSOutputStream.java:2236)
>    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(
> FSDataOutputStream.java:72)
>    at org.apache.hadoop.fs.FSDataOutputStream.close(
> FSDataOutputStream.java:106)
>    at org.apache.parquet.hadoop.ParquetFileWriter.end(
> ParquetFileWriter.java:643)
>    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(
> InternalParquetRecordWriter.java:117)
>    at org.apache.parquet.hadoop.ParquetWriter.close(
> ParquetWriter.java:301)
>        …….
>    at org.apache.flink.api.common.functions.util.FunctionUtils.
> closeFunction(FunctionUtils.java:43)
>    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> dispose(AbstractUdfStreamOperator.java:126)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask.
> disposeAllOperators(StreamTask.java:429)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:334)
>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>    at java.lang.Thread.run(Thread.java:745)
>
> After digging into the source code, we found that when Flink job is
> canceled, a TaskCanceler thread is created. The TaskCanceler thread calls
> cancel() on the invokable and periodically interrupts the task thread until
> it has terminated.
>
> try {
>   invokable.cancel();
> } catch (Throwable t) {
>   logger.error("Error while canceling the task {}.", taskName, t);
> }//......executer.interrupt();try {
>   executer.join(interruptInterval);
> }catch (InterruptedException e) {  // we can ignore this}//......
>
> Notice that TaskCanceler first send interrupt signal to task thread,
> following with join method. And since the task thread is now try to close
> DFSOutputStream, which is waiting for ack, thus InterruptedException is
> throwed in task thread.
>
> synchronized (dataQueue) {while (!streamerClosed) {
>   checkClosed();  if (lastAckedSeqno >= seqno) {    break;
>   }  try {
>     dataQueue.wait(1000); // when we receive an ack, we notify on
>     // dataQueue
>   } catch (InterruptedException ie) {    thrownewInterruptedIOException(
>       "Interrupted while waiting for data to be acknowledged by pipeline");
>   }
> }
>
> I was so confused why TaskCanceler call executer.interrupt() before
> executer.join(interruptInterval). Can anyone help?
>
> Best,
>
> wangsan

Reply via email to