[ https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202700#comment-16202700 ]
Ryan Hobbs commented on FLINK-7737: ----------------------------------- In our case we are using a hadoop compatible file system (HCFS) which is similar to how an S3FileSystem or AzureDataLike would behave. So in the traditional sense we do not have multiple data nodes; instead data is written to a node which is later on distributed. It is for this reason we cannot guarantee that hflush will work. I believe the flash SYNC_BLOCK on create() would be sufficient because it is my understanding that if that flag is set, a hflush() would trigger the hsync() upon close which would work in our case. > On HCFS systems, FSDataOutputStream does not issue hsync only hflush which > leads to data loss > --------------------------------------------------------------------------------------------- > > Key: FLINK-7737 > URL: https://issues.apache.org/jira/browse/FLINK-7737 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Affects Versions: 1.3.2 > Environment: Dev > Reporter: Ryan Hobbs > > During several tests where we simulated failure conditions, we have observed > that on HCFS systems where the data stream is of type FSDataOutputStream, > Flink will issue hflush() and not hsync() which results in data loss. > In the class *StreamWriterBase.java* the code below will execute hsync if the > output stream is of type *HdfsDataOutputStream* but not for streams of type > *FSDataOutputStream*. Is this by design? > {code} > protected void hflushOrSync(FSDataOutputStream os) throws IOException { > try { > // At this point the refHflushOrSync cannot be null, > // since register method would have thrown if it was. > this.refHflushOrSync.invoke(os); > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } > } catch (InvocationTargetException e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e.getCause()); > Throwable cause = e.getCause(); > if (cause != null && cause instanceof IOException) { > throw (IOException) cause; > } > throw new RuntimeException(msg, e); > } catch (Exception e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e); > throw new RuntimeException(msg, e); > } > } > {code} > Could a potential fix me to perform a sync even on streams of type > *FSDataOutputStream*? > {code} > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } else if (os instanceof FSDataOutputStream) { > os.hsync(); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)