The root cause of this problem seems to be that Flink is copying directories with the FileSystem. Unfortunately, unlike the default HDFS implementation, org.apache.ahadoop.fs.s3a.S3AFileSystem does not implement a recursive copyFromLocalFile and Flink 1.0.3 fails when is tries to copy a Window Operator savepoint directory. Flink 1.1 is worse as it cannot even set up the session, because it tries to copy the flink/lib dir on init.
I can work around this in 1.0.3 by subclassing S3AFilesSystem and implementing a recursive copyFromLocalFile. Unfortunately, this isn’t good enough for Flink 1.1 since it expects the copied “lib” director to exist in cache to set up the classpath with (I think). I’m really hoping there is something simple that I’m missing here that someone can fill me in on. Anyone else successfully up and working with Flink -> Yarn -> S3A? If so, what version of Hadoop and Flink, and was there anything you did other than configure core-site.xml? -Cliff From: Clifford Resnick <cresn...@mediamath.com> Reply-To: "user@flink.apache.org" <user@flink.apache.org> Date: Saturday, July 16, 2016 at 12:26 PM To: "user@flink.apache.org" <user@flink.apache.org> Subject: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file? Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4 The error I’m getting is : 11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Caught exception while materializing asynchronous checkpoints. com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1 (Is a directory) at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266) at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131) at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) In the debugger I noticed that some of the uploaded checkpoints are from the configured /tmp location. These succeed as file in the request is fully qualified, but I guess it’s different for WindowOperators? Here the file in the request (using a different /var/folders.. location not configured by me – must be a mac thing?) is actually a directory. The AWS api is failing when it tries to calculate an MD5 of the directory. The Flink side of the codepath is hard to discern from debugging because it’s asynchronous. I get the same issue whether local or on a CentOs- based YARN cluster. Everything works if I use HDFS instead. Any insight will be greatly appreciated! When I get a chance later I may try S3n or perhaps S3a with MD5 verification skipped. -Cliff