Thanks Stefan for replying, I created a JIRA ticket
https://issues.apache.org/jira/browse/FLINK-10382

Best,
Chengzhi

On Thu, Sep 20, 2018 at 7:49 AM Stefan Richter <s.rich...@data-artisans.com>
wrote:

> Hi,
>
> thanks for putting some effort into debugging the problem. Could you open
> a Jira with the problem and your analysis so that we can discuss how to
> proceed with it?
>
> Best,
> Stefan
>
> Am 18.09.2018 um 23:16 schrieb Chengzhi Zhao <w.zhaocheng...@gmail.com>:
>
> After checking the code, I think the issue might be related to
> AvroKeyValueSinkWriter.java and led to the writer has not been closed
> completely. I also noticed this change and affect 1.5+
>
> https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6
>
> @Override
> public void close() throws IOException {
>     if (keyValueWriter != null) {
>         keyValueWriter.close();
>     } else {
>         // need to make sure we close this if we never created the Key/Value 
> Writer.
>         super.close();
>     }
> }
>
> I created my own AvroKeyValueSinkWriter class and implement the code
> similar as v1.4, it seems running fine now.
>
> @Override
> public void close() throws IOException {
>     try {
>         super.close();
>     } finally {
>         if (keyValueWriter != null) {
>             keyValueWriter.close();
>         }
>     }
> }
>
>
> I am curious if anyone had the similar issue, Appreciate anyone has
> insights on it. Thanks!
>
> Best,
> Chengzhi
>
> On Mon, Sep 17, 2018 at 12:01 PM Chengzhi Zhao <w.zhaocheng...@gmail.com>
> wrote:
>
>> Hi Flink Community,
>>
>> I am using flink 1.6.0 and I am using BucketingSink to S3.
>>
>> After the application running for a while ~ 20 mins, I got an exception: 
>> java.lang.IllegalStateException:
>> Writer has already been opened
>>
>> I have attached the job manager log and the sink code is here:
>>
>> val avroOutputPath = output
>> var properties = new util.HashMap[String, String]()
>> val stringSchema = Schema.create(Type.STRING)
>> val keySchema = stringSchema.toString
>> val valueSchema = schema.toString
>>
>> val compress = true
>> properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema)
>> properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema)
>> properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString)
>> properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
>> DataFileConstants.SNAPPY_CODEC)
>>
>> val sink = new BucketingSink[tuple.Tuple2[String, R]](output)
>> sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/"))
>> sink.setBatchSize(1024 * 1024 * batchSize) // this is 64 MB,
>> sink.setPendingSuffix(".avro")
>> sink.setBatchRolloverInterval(20 * 60 * 1000)
>>
>> def getWriter(): BucketingSink[tuple.Tuple2[String, R]] = {
>>   val writer = new AvroKeyValueSinkWriter[String, R](properties)
>>   sink.setWriter(writer)
>>   sink
>> }
>>
>> Any suggestions on why this could happen and how to debug it? Thanks for
>> your help in advance!
>>
>> Regards,
>> Chengzhi
>>
>>
>>
>

Reply via email to