Hi, there,

I am using avro format and write data to S3, recently upgraded from Flink
1.3.2 to 1.5 and noticed the following errors as below:

I am using RocksDB and checkpointDataUri is an S3 location.
My writer looks like something below.

val writer = new AvroKeyValueSinkWriter[String, R](properties).duplicate()
sink.setWriter(writer.duplicate())



2018-07-24 17:50:44,012 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink:
Unnamed (4/4) (28f918a31d273e176409de3d4cb46c3c) switched from RUNNING
to FAILED.
java.lang.IllegalStateException: Writer has already been opened
        at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:69)
        at 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:151)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:561)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
        at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)
2018-07-24 17:50:44,015 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Discarding checkpoint 28 of job cc73a9db44814dc3d5a5ce497c8b0389
because: Writer has already been opened
2018-07-24 17:50:44,016 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
Enrollment Log Member and Chapter (cc73a9db44814dc3d5a5ce497c8b0389)
switched from state RUNNING to FAILING.
java.lang.IllegalStateException: Writer has already been opened
        at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:69)
        at 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:151)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:561)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
        at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)

Any help would be greatly appreciated. Thanks!

Regards,
Chengzhi

Reply via email to