[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199948#comment-16199948
 ] 

Stephan Ewen commented on FLINK-7737:
-------------------------------------

I am still a bit confused - if {{hsync()}} is really only about syncing to the 
DataNode's disks, then the only case where this should make a difference is 
when all 3 data nodes are lost and at least one is recovered from the disk 
data. For any Flink TaskManager failover, this should not make any difference.

This blog article about HBase seems to confirm that {{hsync()}} only matters to 
multi DataNode failures, where data needs to be recovered from disk.

I can see that this is desirable to have as an option (should we pass that as a 
flag to the Bucketing sink?), so guarantee data on disk at checkpoint. If the 
flag is set, the Bucketing sink calls {{hsync()}}, otherwise, it calls 
{{hflush()}}?

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7737
>                 URL: https://issues.apache.org/jira/browse/FLINK-7737
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.3.2
>         Environment: Dev
>            Reporter: Ryan Hobbs
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>                               ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>                       }
>               } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>                       }
> throw new RuntimeException(msg, e);
>               } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>               }
>       }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
>                                 ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>                         } else if (os instanceof FSDataOutputStream) {
>                                 os.hsync();
>                         }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to