I tried reproducing the issue using the
org.apache.hadoop.fs.s3a.S3AFileSystem and it worked.
I had some dependency issues with the S3AFileSystem so I didn't follow that
path for now. If you've used the S3AFileSystem, I can try to get that one
working as well.

On Tue, May 17, 2016 at 11:59 AM, Robert Metzger <rmetz...@apache.org>
wrote:

> Hi,
>
> from the code you've provided, everything seems to look okay. I'm
> currently trying to reproduce the issue.
> Which Flink version are you using?
>
> Which s3 implementation did you configure in the hadoop configuration?
>
> Regards,
> Robert
>
>
> On Mon, May 16, 2016 at 11:52 PM, Chen Qin <qinnc...@gmail.com> wrote:
>
>> Hi there,
>>
>> I have been testing checkpointing on rocksdb backed by s3. Checkpoints
>> seems successful except snapshot states of timeWindow operator on
>> keyedStream. Here is the env setting I used
>> env.setStateBackend(new RocksDBStateBackend(new URI("s3://backupdir/")))
>>
>> The checkpoint for always fail consistently when it goes to window
>> operator snapshotting. Exception log attached below.
>> I tried to env.setStateBackend(new RocksDBStateBackend(new URI(
>> "file:///tmp/checkpoints"))); or MemoryStateBackend(default) works no
>> issue with checkpoints.
>>
>> Does anyone saw this issue before? Or did I mess up with configuration?
>>
>> Thanks,
>> Chen
>>
>> 2016-05-16 17:20:32,132 INFO
>>  org.apache.flink.runtime.state.filesystem.FsStateBackend      -
>> Initializing file state backend to URI
>> s3://xxx/checkpoints/7e6abf126ce3d18f173733b34eda81a9
>> 2016-05-16 17:20:32,423 INFO
>>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using
>> user-defined state backend:
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend@2fa68a53
>> 2016-05-16 17:20:32,423 INFO
>>  org.apache.flink.runtime.state.filesystem.FsStateBackend      -
>> Initializing file state backend to URI
>> s3://uber-beats/sjc1/checkpoints/7e6abf126ce3d18f173733b34eda81a9
>> 2016-05-16 17:21:31,423 INFO
>>  org.apache.flink.contrib.streaming.state.AbstractRocksDBState  - RocksDB
>> (/directory/flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/dbc64864de-8373-4b41-bd74-a26a8007f066)
>> backup (synchronous part) took 8 ms.
>> 2016-05-16 17:21:36,125 ERROR
>> org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught
>> exception while materializing asynchronous checkpoints.
>> com.amazonaws.AmazonClientException: Unable to calculate MD5
>> hash:/directory//flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/local-chk-599
>> (Is a directory)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>> at
>> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>> at
>> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>> at
>> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>> at
>> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>> at
>> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> ...
>>
>>
>> Tests look like
>>
>> .setParallelism(1).assignTimestampsAndWatermarks(new 
>> AssignerWithPunctuatedWatermarks<String>() {
>>     @Override
>>     public Watermark checkAndGetNextWatermark(String s, long l) {
>>         long ts = System.currentTimeMillis() -  60*1000l;
>>         return new Watermark(ts);
>>     }
>>
>>     @Override
>>     public long extractTimestamp(String s, long l) {
>>         long ts =  System.currentTimeMillis();
>>         return ts;
>>     }
>> }).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
>>     @Override
>>     public void flatMap(String s, Collector<Tuple2<String, Long>> collector) 
>> throws Exception {
>>         collector.collect(new Tuple2<>(s, 1l));
>>     }
>> }).keyBy(0).timeWindow(Time.seconds(60)).apply(new 
>> RichWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, Tuple, 
>> TimeWindow>() {
>>
>>     @Override
>>     public void apply(Tuple tuple, TimeWindow timeWindow, 
>> Iterable<Tuple2<String, Long>> iterable, Collector<Tuple2<String, Long>> 
>> collector) throws Exception {
>>         log.info("trigger fire at ", System.currentTimeMillis());
>>         collector.collect(new 
>> Tuple2<>(String.valueOf(timeWindow.toString()), 1l));
>>     }
>> }).rebalance().addSink(new FakeSink<>());
>>
>>
>> JobExecutionResult result = env.execute();
>>
>>
>>
>>
>>
>

Reply via email to