The root cause of this problem seems to be that Flink is copying directories 
with the FileSystem. Unfortunately, unlike the default HDFS implementation, 
org.apache.ahadoop.fs.s3a.S3AFileSystem does not implement a recursive 
copyFromLocalFile and Flink 1.0.3 fails when is tries to copy a Window Operator 
savepoint directory. Flink 1.1 is worse as it cannot even set up the session, 
because it tries to copy the flink/lib dir on init.

I can work around this in 1.0.3 by subclassing S3AFilesSystem and implementing 
a recursive copyFromLocalFile. Unfortunately, this isn’t good enough for Flink 
1.1 since it expects the copied “lib” director to exist in cache to set up the 
classpath with (I think).

I’m really hoping there is something simple that I’m missing here that someone 
can fill me in on. Anyone else successfully up and working with Flink -> Yarn 
-> S3A? If so, what version of Hadoop and Flink, and was there anything you did 
other than configure core-site.xml?

-Cliff


From: Clifford Resnick <cresn...@mediamath.com>
Reply-To: "user@flink.apache.org" <user@flink.apache.org>
Date: Saturday, July 16, 2016 at 12:26 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Error using S3a State Backend: Window Operators sending directory 
instead of fully qualified file?

Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4

The error I’m getting is :

11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask          
 - Caught exception while materializing asynchronous checkpoints.
com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
/var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1
 (Is a directory)
                at 
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)
                at 
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
                at 
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
                at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
                at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)

In the debugger I noticed that some of the uploaded checkpoints are from the 
configured /tmp location. These succeed as file in the request is fully 
qualified, but I guess it’s different for WindowOperators? Here the file in the 
request (using a different /var/folders.. location not configured by me – must 
be a mac thing?) is actually a directory. The AWS api is failing when it tries 
to calculate an MD5 of the directory. The Flink side of the codepath is hard to 
discern from debugging because it’s asynchronous.

I get the same issue whether local or on a CentOs- based YARN cluster. 
Everything works if I use HDFS instead. Any insight will be greatly 
appreciated! When I get a chance later I may try S3n or perhaps S3a with MD5 
verification skipped.

-Cliff




Reply via email to