Hi, thanks for putting some effort into debugging the problem. Could you open a Jira with the problem and your analysis so that we can discuss how to proceed with it?
Best, Stefan > Am 18.09.2018 um 23:16 schrieb Chengzhi Zhao <w.zhaocheng...@gmail.com>: > > After checking the code, I think the issue might be related to > AvroKeyValueSinkWriter.java and led to the writer has not been closed > completely. I also noticed this change and affect 1.5+ > https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6 > > <https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6> > > @Override > public void close() throws IOException { > if (keyValueWriter != null) { > keyValueWriter.close(); > } else { > // need to make sure we close this if we never created the Key/Value > Writer. > super.close(); > } > } > I created my own AvroKeyValueSinkWriter class and implement the code similar > as v1.4, it seems running fine now. > @Override > public void close() throws IOException { > try { > super.close(); > } finally { > if (keyValueWriter != null) { > keyValueWriter.close(); > } > } > } > > I am curious if anyone had the similar issue, Appreciate anyone has insights > on it. Thanks! > > Best, > Chengzhi > > On Mon, Sep 17, 2018 at 12:01 PM Chengzhi Zhao <w.zhaocheng...@gmail.com > <mailto:w.zhaocheng...@gmail.com>> wrote: > Hi Flink Community, > > I am using flink 1.6.0 and I am using BucketingSink to S3. > > After the application running for a while ~ 20 mins, I got an exception: > java.lang.IllegalStateException: Writer has already been opened > > I have attached the job manager log and the sink code is here: > val avroOutputPath = output > var properties = new util.HashMap[String, String]() > val stringSchema = Schema.create(Type.STRING) > val keySchema = stringSchema.toString > val valueSchema = schema.toString > > val compress = true > properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema) > properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema) > properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString) > properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, > DataFileConstants.SNAPPY_CODEC) > > val sink = new BucketingSink[tuple.Tuple2[String, R]](output) > sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/")) > sink.setBatchSize(1024 * 1024 * batchSize) // this is 64 MB, > sink.setPendingSuffix(".avro") > sink.setBatchRolloverInterval(20 * 60 * 1000) > > def getWriter(): BucketingSink[tuple.Tuple2[String, R]] = { > val writer = new AvroKeyValueSinkWriter[String, R](properties) > sink.setWriter(writer) > sink > } > Any suggestions on why this could happen and how to debug it? Thanks for your > help in advance! > > Regards, > Chengzhi > >