Hi all,
We are currently using BucketingSink to save data into HDFS in parquet format.
But when the flink job was canceled, we always got Exception in BucketingSink’s
close method. The detailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893]
[org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal
of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be
acknowledged by pipeline
at
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at
org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
…….
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
After digging into the source code, we found that when Flink job is canceled, a
TaskCanceler thread is created. The TaskCanceler thread calls cancel() on the
invokable and periodically interrupts the task thread until it has terminated.
try {
invokable.cancel();
} catch (Throwable t) {
logger.error("Error while canceling the task {}.", taskName, t);
}//......executer.interrupt();try {
executer.join(interruptInterval);
}catch (InterruptedException e) { // we can ignore this}//......
Notice that TaskCanceler first send interrupt signal to task thread, following
with join method. And since the task thread is now try to close
DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed
in task thread.
synchronized (dataQueue) {while (!streamerClosed) {
checkClosed(); if (lastAckedSeqno >= seqno) { break;
} try {
dataQueue.wait(1000); // when we receive an ack, we notify on
// dataQueue
} catch (InterruptedException ie) { thrownewInterruptedIOException(
"Interrupted while waiting for data to be acknowledged by pipeline");
}
}
I was so confused why TaskCanceler call executer.interrupt() before
executer.join(interruptInterval). Can anyone help?
Best,
wangsan