Thanks Stefan for replying, I created a JIRA ticket https://issues.apache.org/jira/browse/FLINK-10382
Best, Chengzhi On Thu, Sep 20, 2018 at 7:49 AM Stefan Richter <s.rich...@data-artisans.com> wrote: > Hi, > > thanks for putting some effort into debugging the problem. Could you open > a Jira with the problem and your analysis so that we can discuss how to > proceed with it? > > Best, > Stefan > > Am 18.09.2018 um 23:16 schrieb Chengzhi Zhao <w.zhaocheng...@gmail.com>: > > After checking the code, I think the issue might be related to > AvroKeyValueSinkWriter.java and led to the writer has not been closed > completely. I also noticed this change and affect 1.5+ > > https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6 > > @Override > public void close() throws IOException { > if (keyValueWriter != null) { > keyValueWriter.close(); > } else { > // need to make sure we close this if we never created the Key/Value > Writer. > super.close(); > } > } > > I created my own AvroKeyValueSinkWriter class and implement the code > similar as v1.4, it seems running fine now. > > @Override > public void close() throws IOException { > try { > super.close(); > } finally { > if (keyValueWriter != null) { > keyValueWriter.close(); > } > } > } > > > I am curious if anyone had the similar issue, Appreciate anyone has > insights on it. Thanks! > > Best, > Chengzhi > > On Mon, Sep 17, 2018 at 12:01 PM Chengzhi Zhao <w.zhaocheng...@gmail.com> > wrote: > >> Hi Flink Community, >> >> I am using flink 1.6.0 and I am using BucketingSink to S3. >> >> After the application running for a while ~ 20 mins, I got an exception: >> java.lang.IllegalStateException: >> Writer has already been opened >> >> I have attached the job manager log and the sink code is here: >> >> val avroOutputPath = output >> var properties = new util.HashMap[String, String]() >> val stringSchema = Schema.create(Type.STRING) >> val keySchema = stringSchema.toString >> val valueSchema = schema.toString >> >> val compress = true >> properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema) >> properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema) >> properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString) >> properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, >> DataFileConstants.SNAPPY_CODEC) >> >> val sink = new BucketingSink[tuple.Tuple2[String, R]](output) >> sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/")) >> sink.setBatchSize(1024 * 1024 * batchSize) // this is 64 MB, >> sink.setPendingSuffix(".avro") >> sink.setBatchRolloverInterval(20 * 60 * 1000) >> >> def getWriter(): BucketingSink[tuple.Tuple2[String, R]] = { >> val writer = new AvroKeyValueSinkWriter[String, R](properties) >> sink.setWriter(writer) >> sink >> } >> >> Any suggestions on why this could happen and how to debug it? Thanks for >> your help in advance! >> >> Regards, >> Chengzhi >> >> >> >