You're closing the stream and then call super.close() which calls flush, which fails since you already closed the stream.

If you don't close the stream the problem should disappear.

On 03.09.2018 09:30, clay4444 wrote:
When I want to write compressed string data to hdfs, I found that flink only
provides StringWritter, so I used a custom writter, as follows:

public class StringCompressWriter<T> extends StreamWriterBase<T> {

     private static final long serialVersionUID = 1L;

     private String charsetName;

     private transient Charset charset;

     private transient CompressionOutputStream outStream;


     public StringCompressWriter() {
         this("UTF-8");
     }

     public StringCompressWriter(String charsetName) {
         this.charsetName = charsetName;
     }

     protected StringCompressWriter(StringCompressWriter<T> other) {
         super(other);
         this.charsetName = other.charsetName;
     }


     /**
      * open & write
      * @return
      */
     @Override
     public void open(FileSystem fs, Path path) throws IOException {
         super.open(fs, path);

         this.charset = Charset.forName(charsetName);

         Configuration conf = fs.getConf();

         CompressionCodecFactory codecFactory = new
CompressionCodecFactory(conf);
         CompressionCodec codec = codecFactory.getCodecByName("GzipCodec");

         FSDataOutputStream dataOutputStream = getStream();
         Compressor compressor = CodecPool.getCompressor(codec,
fs.getConf());
         outStream = codec.createOutputStream(dataOutputStream, compressor);
     }

     @Override
     public void write(T element) throws IOException {
         getStream(); // Throws if the stream is not open
         outStream.write(element.toString().getBytes(charset));
         outStream.write('\n');
     }

     @Override
     public void close() throws IOException {
         if (outStream != null) {
             outStream.close();
//            outStream = null;
         }
         super.close();
     }

     @Override
     public Writer<T> duplicate() {
         return new StringCompressWriter<>(this);
     }

     @Override
     public int hashCode() {
         return Objects.hash(super.hashCode(), charsetName);
     }

     @Override
     public boolean equals(Object other) {
         if (this == other) {
             return true;
         }
         if (other == null) {
             return false;
         }
         if (getClass() != other.getClass()) {
             return false;
         }
         StringCompressWriter<T> writer = (StringCompressWriter<T>) other;
         // field comparison
         return Objects.equals(charsetName, writer.charsetName)
                 && super.equals(other);
     }
}

But when I run my app on yarn, the taskmanager always reports the following
error:
2018-09-03 15:25:54,187 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink:
bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING
to FAILED.
java.nio.channels.ClosedChannelException
        at
org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618)
        at
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942)
        at
org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
        at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83)
        at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99)
        at
com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73)
        at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570)
        at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
        at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
        at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)
2018-09-03 15:25:54,191 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING
to FAILING.

Moreover, when I looked at the data, I found that the data stream did not
seem to be closed properly.

hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | wc -l

text: Unexpected end of ZLIB input stream
3268

Can someone tell me what happened?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply via email to