[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16205136#comment-16205136
 ] 

Stephan Ewen commented on FLINK-7737:
-------------------------------------

Okay, I think the root cause here is that {{hflush()}} and {{hsync()}} 
semantics are somewhat different across file systems.

We cud solve this the following way:

  1. Always {{hsync()}} in the bucketing sink, but I feat that introduces a 
performance hit for other cases (like HDFS).

  2. Always passing SYNC_BLOCK in create - will that have the same effect as 
(1) ?

  3. Make it configurable in the for the user whether they need sync or only 
flush. But I fear most users will get that wrong.

  4. Make the file systems obey a stricter definition of {{flush()}}, meaning 
it needs to guarantee persistence for loss of a TaskManager. Then this is up to 
the file system implementer or the wrapper to forward these calls properly. 
BTW, it has just gotten a lot easier to plug in new file systems. A file system 
based on Hadoop can also be explicitly exposed to handle certain situations 
differently than the other file systems loaded through Hadoop's abstaction: 
https://github.com/apache/flink/pull/4781 (already merged into master)

I believe that this is an issue for more cases. For example, in order to 
guarantee recoverability of a task manager, the file system mounted under 
{{file://}} needs only {{flush()}} if it is a local file system, but needs to 
{{sync()}} if it is a mounted NFS style file system.

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7737
>                 URL: https://issues.apache.org/jira/browse/FLINK-7737
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.3.2
>         Environment: Dev
>            Reporter: Ryan Hobbs
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>                               ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>                       }
>               } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>                       }
> throw new RuntimeException(msg, e);
>               } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>               }
>       }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
>                                 ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>                         } else if (os instanceof FSDataOutputStream) {
>                                 os.hsync();
>                         }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to