[ https://issues.apache.org/jira/browse/FLINK-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886417#comment-15886417 ]
Seth Wiesman commented on FLINK-5706: ------------------------------------- [~StephanEwen] To expand on our conversation from the mailing list. As I mentioned before, EMR handles S3 consistency by building a [consistent view|http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html] using a dynamo db table. The core idea is that we should be able to trust that Flink is in a consistent state, so when a file system operation is performed such as a rename or a delete, we should first check that the requested file exists and if not wait for it. However, the file may actually not exist for whatever external reason or it may take S3 longer than we are willing to wait to become consistent because S3 consistency has no upper bound. {code:java} public void delete(String path) throws Exception { for (int i = 0; i < numPasses; i++) { if (exists(path)) { delete_impl(path); } else { Thread.sleep(...); } } throw new FileNotFoundException("File either does not exist or took to long to become consistent); } {code} This is as far as I went with my implementation and it seems to work for most cases but it does contains two core issues: 1) We are not able to differentiate between inconsistent files and missing files. If Flink is running in real time we are probably running into a consistency error, but what if the user restarts from a checkpoint in the past? In that case the files may actually not exist leaving Flink in an inconsistent state which breaks the core invariant of this solution. 2) Certain operations really do take to long to become consistent, causing the entire pipeline to slow down. Take the bucketing sink as an example. On checkpoint the current in progress file is [renamed to pending | https://github.com/apache/flink/blob/8bcb2ae3ccf6a58d8f42f29d67fdb7d88a95f8ed/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L563]. This will work because S3 has read on write consistency. But after the checkpoint is complete the [pending file is again renamed to complete | https://github.com/apache/flink/blob/8bcb2ae3ccf6a58d8f42f29d67fdb7d88a95f8ed/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L655]. S3 renames do not have consistency guarantees and so this will eventually become inconsistent. The problem I ran into was even with the updated fs implementation there would inevitably be files which took upwards of several minutes to become consistent. Eventually this lead to writing a custom sink specifically for S3 which understands what it is capable of. Ultimately I believe this shows that for true S3 interop certain parts of this problem will bleed through the file system abstraction. > Implement Flink's own S3 filesystem > ----------------------------------- > > Key: FLINK-5706 > URL: https://issues.apache.org/jira/browse/FLINK-5706 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector > Reporter: Stephan Ewen > > As part of the effort to make Flink completely independent from Hadoop, Flink > needs its own S3 filesystem implementation. Currently Flink relies on > Hadoop's S3a and S3n file systems. > An own S3 file system can be implemented using the AWS SDK. As the basis of > the implementation, the Hadoop File System can be used (Apache Licensed, > should be okay to reuse some code as long as we do a proper attribution). -- This message was sent by Atlassian JIRA (v6.3.15#6346)