After checking the code, I think the issue might be related to
AvroKeyValueSinkWriter.java and led to the writer has not been closed
completely. I also noticed this change and affect 1.5+
https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6

@Override
public void close() throws IOException {
    if (keyValueWriter != null) {
        keyValueWriter.close();
    } else {
        // need to make sure we close this if we never created the
Key/Value Writer.
        super.close();
    }
}

I created my own AvroKeyValueSinkWriter class and implement the code
similar as v1.4, it seems running fine now.

@Override
public void close() throws IOException {
    try {
        super.close();
    } finally {
        if (keyValueWriter != null) {
            keyValueWriter.close();
        }
    }
}


I am curious if anyone had the similar issue, Appreciate anyone has
insights on it. Thanks!

Best,
Chengzhi

On Mon, Sep 17, 2018 at 12:01 PM Chengzhi Zhao <w.zhaocheng...@gmail.com>
wrote:

> Hi Flink Community,
>
> I am using flink 1.6.0 and I am using BucketingSink to S3.
>
> After the application running for a while ~ 20 mins, I got an exception: 
> java.lang.IllegalStateException:
> Writer has already been opened
>
> I have attached the job manager log and the sink code is here:
>
> val avroOutputPath = output
> var properties = new util.HashMap[String, String]()
> val stringSchema = Schema.create(Type.STRING)
> val keySchema = stringSchema.toString
> val valueSchema = schema.toString
>
> val compress = true
> properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema)
> properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema)
> properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString)
> properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
> DataFileConstants.SNAPPY_CODEC)
>
> val sink = new BucketingSink[tuple.Tuple2[String, R]](output)
> sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/"))
> sink.setBatchSize(1024 * 1024 * batchSize) // this is 64 MB,
> sink.setPendingSuffix(".avro")
> sink.setBatchRolloverInterval(20 * 60 * 1000)
>
> def getWriter(): BucketingSink[tuple.Tuple2[String, R]] = {
>   val writer = new AvroKeyValueSinkWriter[String, R](properties)
>   sink.setWriter(writer)
>   sink
> }
>
> Any suggestions on why this could happen and how to debug it? Thanks for
> your help in advance!
>
> Regards,
> Chengzhi
>
>
>

Reply via email to