You're closing the stream and then call super.close() which calls flush,
which fails since you already closed the stream.
If you don't close the stream the problem should disappear.
On 03.09.2018 09:30, clay4444 wrote:
When I want to write compressed string data to hdfs, I found that flink only
provides StringWritter, so I used a custom writter, as follows:
public class StringCompressWriter<T> extends StreamWriterBase<T> {
private static final long serialVersionUID = 1L;
private String charsetName;
private transient Charset charset;
private transient CompressionOutputStream outStream;
public StringCompressWriter() {
this("UTF-8");
}
public StringCompressWriter(String charsetName) {
this.charsetName = charsetName;
}
protected StringCompressWriter(StringCompressWriter<T> other) {
super(other);
this.charsetName = other.charsetName;
}
/**
* open & write
* @return
*/
@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.charset = Charset.forName(charsetName);
Configuration conf = fs.getConf();
CompressionCodecFactory codecFactory = new
CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodecByName("GzipCodec");
FSDataOutputStream dataOutputStream = getStream();
Compressor compressor = CodecPool.getCompressor(codec,
fs.getConf());
outStream = codec.createOutputStream(dataOutputStream, compressor);
}
@Override
public void write(T element) throws IOException {
getStream(); // Throws if the stream is not open
outStream.write(element.toString().getBytes(charset));
outStream.write('\n');
}
@Override
public void close() throws IOException {
if (outStream != null) {
outStream.close();
// outStream = null;
}
super.close();
}
@Override
public Writer<T> duplicate() {
return new StringCompressWriter<>(this);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), charsetName);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
if (getClass() != other.getClass()) {
return false;
}
StringCompressWriter<T> writer = (StringCompressWriter<T>) other;
// field comparison
return Objects.equals(charsetName, writer.charsetName)
&& super.equals(other);
}
}
But when I run my app on yarn, the taskmanager always reports the following
error:
2018-09-03 15:25:54,187 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink:
bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING
to FAILED.
java.nio.channels.ClosedChannelException
at
org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618)
at
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982)
at
org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942)
at
org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83)
at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99)
at
com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
2018-09-03 15:25:54,191 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING
to FAILING.
Moreover, when I looked at the data, I found that the data stream did not
seem to be closed properly.
hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | wc -l
text: Unexpected end of ZLIB input stream
3268
Can someone tell me what happened?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/