Hi there,

I have been testing checkpointing on rocksdb backed by s3. Checkpoints
seems successful except snapshot states of timeWindow operator on
keyedStream. Here is the env setting I used
env.setStateBackend(new RocksDBStateBackend(new URI("s3://backupdir/")))

The checkpoint for always fail consistently when it goes to window operator
snapshotting. Exception log attached below.
I tried to env.setStateBackend(new RocksDBStateBackend(new URI(
"file:///tmp/checkpoints"))); or MemoryStateBackend(default) works no issue
with checkpoints.

Does anyone saw this issue before? Or did I mess up with configuration?

Thanks,
Chen

2016-05-16 17:20:32,132 INFO
 org.apache.flink.runtime.state.filesystem.FsStateBackend      -
Initializing file state backend to URI
s3://xxx/checkpoints/7e6abf126ce3d18f173733b34eda81a9
2016-05-16 17:20:32,423 INFO
 org.apache.flink.streaming.runtime.tasks.StreamTask           - Using
user-defined state backend:
org.apache.flink.contrib.streaming.state.RocksDBStateBackend@2fa68a53
2016-05-16 17:20:32,423 INFO
 org.apache.flink.runtime.state.filesystem.FsStateBackend      -
Initializing file state backend to URI
s3://uber-beats/sjc1/checkpoints/7e6abf126ce3d18f173733b34eda81a9
2016-05-16 17:21:31,423 INFO
 org.apache.flink.contrib.streaming.state.AbstractRocksDBState  - RocksDB
(/directory/flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/dbc64864de-8373-4b41-bd74-a26a8007f066)
backup (synchronous part) took 8 ms.
2016-05-16 17:21:36,125 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught
exception while materializing asynchronous checkpoints.
com.amazonaws.AmazonClientException: Unable to calculate MD5
hash:/directory//flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/local-chk-599
(Is a directory)
at
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
at
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
at
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

...


Tests look like

.setParallelism(1).assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks<String>() {
    @Override
    public Watermark checkAndGetNextWatermark(String s, long l) {
        long ts = System.currentTimeMillis() -  60*1000l;
        return new Watermark(ts);
    }

    @Override
    public long extractTimestamp(String s, long l) {
        long ts =  System.currentTimeMillis();
        return ts;
    }
}).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    @Override
    public void flatMap(String s, Collector<Tuple2<String, Long>>
collector) throws Exception {
        collector.collect(new Tuple2<>(s, 1l));
    }
}).keyBy(0).timeWindow(Time.seconds(60)).apply(new
RichWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, Tuple,
TimeWindow>() {

    @Override
    public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable<Tuple2<String, Long>> iterable, Collector<Tuple2<String,
Long>> collector) throws Exception {
        log.info("trigger fire at ", System.currentTimeMillis());
        collector.collect(new
Tuple2<>(String.valueOf(timeWindow.toString()), 1l));
    }
}).rebalance().addSink(new FakeSink<>());


JobExecutionResult result = env.execute();

Reply via email to