After checking the code, I think the issue might be related to AvroKeyValueSinkWriter.java and led to the writer has not been closed completely. I also noticed this change and affect 1.5+ https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6
@Override public void close() throws IOException { if (keyValueWriter != null) { keyValueWriter.close(); } else { // need to make sure we close this if we never created the Key/Value Writer. super.close(); } } I created my own AvroKeyValueSinkWriter class and implement the code similar as v1.4, it seems running fine now. @Override public void close() throws IOException { try { super.close(); } finally { if (keyValueWriter != null) { keyValueWriter.close(); } } } I am curious if anyone had the similar issue, Appreciate anyone has insights on it. Thanks! Best, Chengzhi On Mon, Sep 17, 2018 at 12:01 PM Chengzhi Zhao <w.zhaocheng...@gmail.com> wrote: > Hi Flink Community, > > I am using flink 1.6.0 and I am using BucketingSink to S3. > > After the application running for a while ~ 20 mins, I got an exception: > java.lang.IllegalStateException: > Writer has already been opened > > I have attached the job manager log and the sink code is here: > > val avroOutputPath = output > var properties = new util.HashMap[String, String]() > val stringSchema = Schema.create(Type.STRING) > val keySchema = stringSchema.toString > val valueSchema = schema.toString > > val compress = true > properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema) > properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema) > properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString) > properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, > DataFileConstants.SNAPPY_CODEC) > > val sink = new BucketingSink[tuple.Tuple2[String, R]](output) > sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/")) > sink.setBatchSize(1024 * 1024 * batchSize) // this is 64 MB, > sink.setPendingSuffix(".avro") > sink.setBatchRolloverInterval(20 * 60 * 1000) > > def getWriter(): BucketingSink[tuple.Tuple2[String, R]] = { > val writer = new AvroKeyValueSinkWriter[String, R](properties) > sink.setWriter(writer) > sink > } > > Any suggestions on why this could happen and how to debug it? Thanks for > your help in advance! > > Regards, > Chengzhi > > >