Hi,

thanks for putting some effort into debugging the problem. Could you open a 
Jira with the problem and your analysis so that we can discuss how to proceed 
with it?

Best,
Stefan

> Am 18.09.2018 um 23:16 schrieb Chengzhi Zhao <w.zhaocheng...@gmail.com>:
> 
> After checking the code, I think the issue might be related to 
> AvroKeyValueSinkWriter.java and led to the writer has not been closed 
> completely. I also noticed this change and affect 1.5+ 
> https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6
>  
> <https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6>
> 
> @Override
> public void close() throws IOException {
>     if (keyValueWriter != null) {
>         keyValueWriter.close();
>     } else {
>         // need to make sure we close this if we never created the Key/Value 
> Writer.
>         super.close();
>     }
> }
> I created my own AvroKeyValueSinkWriter class and implement the code similar 
> as v1.4, it seems running fine now. 
> @Override
> public void close() throws IOException {
>     try {
>         super.close();
>     } finally {
>         if (keyValueWriter != null) {
>             keyValueWriter.close();
>         }
>     }
> }
> 
> I am curious if anyone had the similar issue, Appreciate anyone has insights 
> on it. Thanks! 
> 
> Best,
> Chengzhi
> 
> On Mon, Sep 17, 2018 at 12:01 PM Chengzhi Zhao <w.zhaocheng...@gmail.com 
> <mailto:w.zhaocheng...@gmail.com>> wrote:
> Hi Flink Community,
> 
> I am using flink 1.6.0 and I am using BucketingSink to S3.
> 
> After the application running for a while ~ 20 mins, I got an exception: 
> java.lang.IllegalStateException: Writer has already been opened
> 
> I have attached the job manager log and the sink code is here:
> val avroOutputPath = output
> var properties = new util.HashMap[String, String]()
> val stringSchema = Schema.create(Type.STRING)
> val keySchema = stringSchema.toString
> val valueSchema = schema.toString
> 
> val compress = true
> properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema)
> properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema)
> properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString)
> properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
> DataFileConstants.SNAPPY_CODEC)
> 
> val sink = new BucketingSink[tuple.Tuple2[String, R]](output)
> sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/"))
> sink.setBatchSize(1024 * 1024 * batchSize) // this is 64 MB,
> sink.setPendingSuffix(".avro")
> sink.setBatchRolloverInterval(20 * 60 * 1000)
> 
> def getWriter(): BucketingSink[tuple.Tuple2[String, R]] = {
>   val writer = new AvroKeyValueSinkWriter[String, R](properties)
>   sink.setWriter(writer)
>   sink
> }
> Any suggestions on why this could happen and how to debug it? Thanks for your 
> help in advance! 
> 
> Regards,
> Chengzhi
> 
> 

Reply via email to