[ 
https://issues.apache.org/jira/browse/FLINK-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886417#comment-15886417
 ] 

Seth Wiesman commented on FLINK-5706:
-------------------------------------

[~StephanEwen] To expand on our conversation from the mailing list.

As I mentioned before, EMR handles S3 consistency by building a [consistent 
view|http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html]
 using a dynamo db table. The core idea is that we should be able to trust that 
Flink is in a consistent state, so when a file system operation is performed 
such as a rename or a delete, we should first check that the requested file 
exists and if not wait for it. However, the file may actually not exist for 
whatever external reason or it may take S3 longer than we are willing to wait 
to become consistent because S3 consistency has no upper bound. 

{code:java}
public void delete(String path) throws Exception {
    for (int i = 0;  i < numPasses; i++) {
       if (exists(path)) {
           delete_impl(path);
        } else {
            Thread.sleep(...);
        } 
    }

    throw new FileNotFoundException("File either does not exist or took to long 
to become consistent);
}
{code} 

This is as far as I went with my implementation and it seems to work for most 
cases but it does contains two core issues: 

1) We are not able to differentiate between inconsistent files and missing 
files. If Flink is running in real time we are probably running into a 
consistency error, but what if the user restarts from a checkpoint in the past? 
In that case the files may actually not exist leaving Flink in an inconsistent 
state which breaks the core invariant of this solution. 

2) Certain operations really do take to long to become consistent, causing the 
entire pipeline to slow down. Take the bucketing sink as an example. On 
checkpoint the current in progress file is [renamed to pending | 
https://github.com/apache/flink/blob/8bcb2ae3ccf6a58d8f42f29d67fdb7d88a95f8ed/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L563].
 This will work because S3 has read on write consistency. But after the 
checkpoint is complete the [pending file is again renamed to complete | 
https://github.com/apache/flink/blob/8bcb2ae3ccf6a58d8f42f29d67fdb7d88a95f8ed/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L655].
 S3 renames do not have consistency guarantees and so this will eventually 
become inconsistent. The problem I ran into was even with the updated fs 
implementation there would inevitably be files which took upwards of several 
minutes to become consistent. Eventually this lead to writing a custom sink 
specifically for S3 which understands what it is capable of. Ultimately I 
believe this shows that for true S3 interop certain parts of this problem will 
bleed through the file system abstraction. 

> Implement Flink's own S3 filesystem
> -----------------------------------
>
>                 Key: FLINK-5706
>                 URL: https://issues.apache.org/jira/browse/FLINK-5706
>             Project: Flink
>          Issue Type: New Feature
>          Components: filesystem-connector
>            Reporter: Stephan Ewen
>
> As part of the effort to make Flink completely independent from Hadoop, Flink 
> needs its own S3 filesystem implementation. Currently Flink relies on 
> Hadoop's S3a and S3n file systems.
> An own S3 file system can be implemented using the AWS SDK. As the basis of 
> the implementation, the Hadoop File System can be used (Apache Licensed, 
> should be okay to reuse some code as long as we do a proper attribution).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to