When I want to write compressed string data to hdfs, I found that flink only provides StringWritter, so I used a custom writter, as follows:
public class StringCompressWriter<T> extends StreamWriterBase<T> { private static final long serialVersionUID = 1L; private String charsetName; private transient Charset charset; private transient CompressionOutputStream outStream; public StringCompressWriter() { this("UTF-8"); } public StringCompressWriter(String charsetName) { this.charsetName = charsetName; } protected StringCompressWriter(StringCompressWriter<T> other) { super(other); this.charsetName = other.charsetName; } /** * open & write * @return */ @Override public void open(FileSystem fs, Path path) throws IOException { super.open(fs, path); this.charset = Charset.forName(charsetName); Configuration conf = fs.getConf(); CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); CompressionCodec codec = codecFactory.getCodecByName("GzipCodec"); FSDataOutputStream dataOutputStream = getStream(); Compressor compressor = CodecPool.getCompressor(codec, fs.getConf()); outStream = codec.createOutputStream(dataOutputStream, compressor); } @Override public void write(T element) throws IOException { getStream(); // Throws if the stream is not open outStream.write(element.toString().getBytes(charset)); outStream.write('\n'); } @Override public void close() throws IOException { if (outStream != null) { outStream.close(); // outStream = null; } super.close(); } @Override public Writer<T> duplicate() { return new StringCompressWriter<>(this); } @Override public int hashCode() { return Objects.hash(super.hashCode(), charsetName); } @Override public boolean equals(Object other) { if (this == other) { return true; } if (other == null) { return false; } if (getClass() != other.getClass()) { return false; } StringCompressWriter<T> writer = (StringCompressWriter<T>) other; // field comparison return Objects.equals(charsetName, writer.charsetName) && super.equals(other); } } But when I run my app on yarn, the taskmanager always reports the following error: 2018-09-03 15:25:54,187 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING to FAILED. java.nio.channels.ClosedChannelException at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942) at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99) at com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) 2018-09-03 15:25:54,191 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING to FAILING. Moreover, when I looked at the data, I found that the data stream did not seem to be closed properly. hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | wc -l text: Unexpected end of ZLIB input stream 3268 Can someone tell me what happened? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/