I tried reproducing the issue using the org.apache.hadoop.fs.s3a.S3AFileSystem and it worked. I had some dependency issues with the S3AFileSystem so I didn't follow that path for now. If you've used the S3AFileSystem, I can try to get that one working as well.
On Tue, May 17, 2016 at 11:59 AM, Robert Metzger <rmetz...@apache.org> wrote: > Hi, > > from the code you've provided, everything seems to look okay. I'm > currently trying to reproduce the issue. > Which Flink version are you using? > > Which s3 implementation did you configure in the hadoop configuration? > > Regards, > Robert > > > On Mon, May 16, 2016 at 11:52 PM, Chen Qin <qinnc...@gmail.com> wrote: > >> Hi there, >> >> I have been testing checkpointing on rocksdb backed by s3. Checkpoints >> seems successful except snapshot states of timeWindow operator on >> keyedStream. Here is the env setting I used >> env.setStateBackend(new RocksDBStateBackend(new URI("s3://backupdir/"))) >> >> The checkpoint for always fail consistently when it goes to window >> operator snapshotting. Exception log attached below. >> I tried to env.setStateBackend(new RocksDBStateBackend(new URI( >> "file:///tmp/checkpoints"))); or MemoryStateBackend(default) works no >> issue with checkpoints. >> >> Does anyone saw this issue before? Or did I mess up with configuration? >> >> Thanks, >> Chen >> >> 2016-05-16 17:20:32,132 INFO >> org.apache.flink.runtime.state.filesystem.FsStateBackend - >> Initializing file state backend to URI >> s3://xxx/checkpoints/7e6abf126ce3d18f173733b34eda81a9 >> 2016-05-16 17:20:32,423 INFO >> org.apache.flink.streaming.runtime.tasks.StreamTask - Using >> user-defined state backend: >> org.apache.flink.contrib.streaming.state.RocksDBStateBackend@2fa68a53 >> 2016-05-16 17:20:32,423 INFO >> org.apache.flink.runtime.state.filesystem.FsStateBackend - >> Initializing file state backend to URI >> s3://uber-beats/sjc1/checkpoints/7e6abf126ce3d18f173733b34eda81a9 >> 2016-05-16 17:21:31,423 INFO >> org.apache.flink.contrib.streaming.state.AbstractRocksDBState - RocksDB >> (/directory/flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/dbc64864de-8373-4b41-bd74-a26a8007f066) >> backup (synchronous part) took 8 ms. >> 2016-05-16 17:21:36,125 ERROR >> org.apache.flink.streaming.runtime.tasks.StreamTask - Caught >> exception while materializing asynchronous checkpoints. >> com.amazonaws.AmazonClientException: Unable to calculate MD5 >> hash:/directory//flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/local-chk-599 >> (Is a directory) >> at >> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) >> at >> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) >> at >> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) >> at >> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) >> at >> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) >> at >> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> >> ... >> >> >> Tests look like >> >> .setParallelism(1).assignTimestampsAndWatermarks(new >> AssignerWithPunctuatedWatermarks<String>() { >> @Override >> public Watermark checkAndGetNextWatermark(String s, long l) { >> long ts = System.currentTimeMillis() - 60*1000l; >> return new Watermark(ts); >> } >> >> @Override >> public long extractTimestamp(String s, long l) { >> long ts = System.currentTimeMillis(); >> return ts; >> } >> }).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() { >> @Override >> public void flatMap(String s, Collector<Tuple2<String, Long>> collector) >> throws Exception { >> collector.collect(new Tuple2<>(s, 1l)); >> } >> }).keyBy(0).timeWindow(Time.seconds(60)).apply(new >> RichWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, Tuple, >> TimeWindow>() { >> >> @Override >> public void apply(Tuple tuple, TimeWindow timeWindow, >> Iterable<Tuple2<String, Long>> iterable, Collector<Tuple2<String, Long>> >> collector) throws Exception { >> log.info("trigger fire at ", System.currentTimeMillis()); >> collector.collect(new >> Tuple2<>(String.valueOf(timeWindow.toString()), 1l)); >> } >> }).rebalance().addSink(new FakeSink<>()); >> >> >> JobExecutionResult result = env.execute(); >> >> >> >> >> >