Hi Stephan, I am facing S3 consistency related issue with the exception pasted at the end:
We were able to solve the s3 sync issue by adding System.currentTime to inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix properties of BucketingSink. I tried another approach by updating the BucketingSink code wherein I have appended the partPath variable with System.currentTime (in openNewPartFile method). Can you please let me know if this is the correct approach in order to get rid of this exception. TimerException{java.io.IOException: Unable to create file due to concurrent write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Unable to create file due to concurrent write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress at com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245) at com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201) at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147) at org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479) Regards, Vinay Patil -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/