Hi,

from the code you've provided, everything seems to look okay. I'm currently
trying to reproduce the issue.
Which Flink version are you using?

Which s3 implementation did you configure in the hadoop configuration?

Regards,
Robert


On Mon, May 16, 2016 at 11:52 PM, Chen Qin <qinnc...@gmail.com> wrote:

> Hi there,
>
> I have been testing checkpointing on rocksdb backed by s3. Checkpoints
> seems successful except snapshot states of timeWindow operator on
> keyedStream. Here is the env setting I used
> env.setStateBackend(new RocksDBStateBackend(new URI("s3://backupdir/")))
>
> The checkpoint for always fail consistently when it goes to window
> operator snapshotting. Exception log attached below.
> I tried to env.setStateBackend(new RocksDBStateBackend(new URI(
> "file:///tmp/checkpoints"))); or MemoryStateBackend(default) works no
> issue with checkpoints.
>
> Does anyone saw this issue before? Or did I mess up with configuration?
>
> Thanks,
> Chen
>
> 2016-05-16 17:20:32,132 INFO
>  org.apache.flink.runtime.state.filesystem.FsStateBackend      -
> Initializing file state backend to URI
> s3://xxx/checkpoints/7e6abf126ce3d18f173733b34eda81a9
> 2016-05-16 17:20:32,423 INFO
>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using
> user-defined state backend:
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend@2fa68a53
> 2016-05-16 17:20:32,423 INFO
>  org.apache.flink.runtime.state.filesystem.FsStateBackend      -
> Initializing file state backend to URI
> s3://uber-beats/sjc1/checkpoints/7e6abf126ce3d18f173733b34eda81a9
> 2016-05-16 17:21:31,423 INFO
>  org.apache.flink.contrib.streaming.state.AbstractRocksDBState  - RocksDB
> (/directory/flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/dbc64864de-8373-4b41-bd74-a26a8007f066)
> backup (synchronous part) took 8 ms.
> 2016-05-16 17:21:36,125 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught
> exception while materializing asynchronous checkpoints.
> com.amazonaws.AmazonClientException: Unable to calculate MD5
> hash:/directory//flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/local-chk-599
> (Is a directory)
> at
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
> at
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
> at
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
> at
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
> at
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
> at
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> ...
>
>
> Tests look like
>
> .setParallelism(1).assignTimestampsAndWatermarks(new 
> AssignerWithPunctuatedWatermarks<String>() {
>     @Override
>     public Watermark checkAndGetNextWatermark(String s, long l) {
>         long ts = System.currentTimeMillis() -  60*1000l;
>         return new Watermark(ts);
>     }
>
>     @Override
>     public long extractTimestamp(String s, long l) {
>         long ts =  System.currentTimeMillis();
>         return ts;
>     }
> }).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
>     @Override
>     public void flatMap(String s, Collector<Tuple2<String, Long>> collector) 
> throws Exception {
>         collector.collect(new Tuple2<>(s, 1l));
>     }
> }).keyBy(0).timeWindow(Time.seconds(60)).apply(new 
> RichWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, Tuple, 
> TimeWindow>() {
>
>     @Override
>     public void apply(Tuple tuple, TimeWindow timeWindow, 
> Iterable<Tuple2<String, Long>> iterable, Collector<Tuple2<String, Long>> 
> collector) throws Exception {
>         log.info("trigger fire at ", System.currentTimeMillis());
>         collector.collect(new Tuple2<>(String.valueOf(timeWindow.toString()), 
> 1l));
>     }
> }).rebalance().addSink(new FakeSink<>());
>
>
> JobExecutionResult result = env.execute();
>
>
>
>
>

Reply via email to