When I want to write compressed string data to hdfs, I found that flink only
provides StringWritter, so I used a custom writter, as follows:

public class StringCompressWriter<T> extends StreamWriterBase<T> {

    private static final long serialVersionUID = 1L;

    private String charsetName;

    private transient Charset charset;

    private transient CompressionOutputStream outStream;


    public StringCompressWriter() {
        this("UTF-8");
    }

    public StringCompressWriter(String charsetName) {
        this.charsetName = charsetName;
    }

    protected StringCompressWriter(StringCompressWriter<T> other) {
        super(other);
        this.charsetName = other.charsetName;
    }


    /**
     * open & write
     * @return
     */
    @Override
    public void open(FileSystem fs, Path path) throws IOException {
        super.open(fs, path);

        this.charset = Charset.forName(charsetName);

        Configuration conf = fs.getConf();

        CompressionCodecFactory codecFactory = new
CompressionCodecFactory(conf);
        CompressionCodec codec = codecFactory.getCodecByName("GzipCodec");

        FSDataOutputStream dataOutputStream = getStream();
        Compressor compressor = CodecPool.getCompressor(codec,
fs.getConf());
        outStream = codec.createOutputStream(dataOutputStream, compressor);
    }

    @Override
    public void write(T element) throws IOException {
        getStream(); // Throws if the stream is not open
        outStream.write(element.toString().getBytes(charset));
        outStream.write('\n');
    }

    @Override
    public void close() throws IOException {
        if (outStream != null) {
            outStream.close();
//            outStream = null;
        }
        super.close();
    }

    @Override
    public Writer<T> duplicate() {
        return new StringCompressWriter<>(this);
    }

    @Override
    public int hashCode() {
        return Objects.hash(super.hashCode(), charsetName);
    }

    @Override
    public boolean equals(Object other) {
        if (this == other) {
            return true;
        }
        if (other == null) {
            return false;
        }
        if (getClass() != other.getClass()) {
            return false;
        }
        StringCompressWriter<T> writer = (StringCompressWriter<T>) other;
        // field comparison
        return Objects.equals(charsetName, writer.charsetName)
                && super.equals(other);
    }
}

But when I run my app on yarn, the taskmanager always reports the following
error:
2018-09-03 15:25:54,187 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink:
bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING
to FAILED.
java.nio.channels.ClosedChannelException
        at
org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618)
        at
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942)
        at
org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
        at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83)
        at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99)
        at
com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73)
        at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570)
        at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
        at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
        at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)
2018-09-03 15:25:54,191 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING
to FAILING.

Moreover, when I looked at the data, I found that the data stream did not
seem to be closed properly.

hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | wc -l 

text: Unexpected end of ZLIB input stream
3268

Can someone tell me what happened?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to