Hi there, I have been testing checkpointing on rocksdb backed by s3. Checkpoints seems successful except snapshot states of timeWindow operator on keyedStream. Here is the env setting I used env.setStateBackend(new RocksDBStateBackend(new URI("s3://backupdir/")))
The checkpoint for always fail consistently when it goes to window operator snapshotting. Exception log attached below. I tried to env.setStateBackend(new RocksDBStateBackend(new URI( "file:///tmp/checkpoints"))); or MemoryStateBackend(default) works no issue with checkpoints. Does anyone saw this issue before? Or did I mess up with configuration? Thanks, Chen 2016-05-16 17:20:32,132 INFO org.apache.flink.runtime.state.filesystem.FsStateBackend - Initializing file state backend to URI s3://xxx/checkpoints/7e6abf126ce3d18f173733b34eda81a9 2016-05-16 17:20:32,423 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using user-defined state backend: org.apache.flink.contrib.streaming.state.RocksDBStateBackend@2fa68a53 2016-05-16 17:20:32,423 INFO org.apache.flink.runtime.state.filesystem.FsStateBackend - Initializing file state backend to URI s3://uber-beats/sjc1/checkpoints/7e6abf126ce3d18f173733b34eda81a9 2016-05-16 17:21:31,423 INFO org.apache.flink.contrib.streaming.state.AbstractRocksDBState - RocksDB (/directory/flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/dbc64864de-8373-4b41-bd74-a26a8007f066) backup (synchronous part) took 8 ms. 2016-05-16 17:21:36,125 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Caught exception while materializing asynchronous checkpoints. com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:/directory//flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/local-chk-599 (Is a directory) at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ... Tests look like .setParallelism(1).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<String>() { @Override public Watermark checkAndGetNextWatermark(String s, long l) { long ts = System.currentTimeMillis() - 60*1000l; return new Watermark(ts); } @Override public long extractTimestamp(String s, long l) { long ts = System.currentTimeMillis(); return ts; } }).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception { collector.collect(new Tuple2<>(s, 1l)); } }).keyBy(0).timeWindow(Time.seconds(60)).apply(new RichWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<Tuple2<String, Long>> collector) throws Exception { log.info("trigger fire at ", System.currentTimeMillis()); collector.collect(new Tuple2<>(String.valueOf(timeWindow.toString()), 1l)); } }).rebalance().addSink(new FakeSink<>()); JobExecutionResult result = env.execute();