[ https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16205136#comment-16205136 ]
Stephan Ewen commented on FLINK-7737: ------------------------------------- Okay, I think the root cause here is that {{hflush()}} and {{hsync()}} semantics are somewhat different across file systems. We cud solve this the following way: 1. Always {{hsync()}} in the bucketing sink, but I feat that introduces a performance hit for other cases (like HDFS). 2. Always passing SYNC_BLOCK in create - will that have the same effect as (1) ? 3. Make it configurable in the for the user whether they need sync or only flush. But I fear most users will get that wrong. 4. Make the file systems obey a stricter definition of {{flush()}}, meaning it needs to guarantee persistence for loss of a TaskManager. Then this is up to the file system implementer or the wrapper to forward these calls properly. BTW, it has just gotten a lot easier to plug in new file systems. A file system based on Hadoop can also be explicitly exposed to handle certain situations differently than the other file systems loaded through Hadoop's abstaction: https://github.com/apache/flink/pull/4781 (already merged into master) I believe that this is an issue for more cases. For example, in order to guarantee recoverability of a task manager, the file system mounted under {{file://}} needs only {{flush()}} if it is a local file system, but needs to {{sync()}} if it is a mounted NFS style file system. > On HCFS systems, FSDataOutputStream does not issue hsync only hflush which > leads to data loss > --------------------------------------------------------------------------------------------- > > Key: FLINK-7737 > URL: https://issues.apache.org/jira/browse/FLINK-7737 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Affects Versions: 1.3.2 > Environment: Dev > Reporter: Ryan Hobbs > Priority: Blocker > Fix For: 1.4.0 > > > During several tests where we simulated failure conditions, we have observed > that on HCFS systems where the data stream is of type FSDataOutputStream, > Flink will issue hflush() and not hsync() which results in data loss. > In the class *StreamWriterBase.java* the code below will execute hsync if the > output stream is of type *HdfsDataOutputStream* but not for streams of type > *FSDataOutputStream*. Is this by design? > {code} > protected void hflushOrSync(FSDataOutputStream os) throws IOException { > try { > // At this point the refHflushOrSync cannot be null, > // since register method would have thrown if it was. > this.refHflushOrSync.invoke(os); > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } > } catch (InvocationTargetException e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e.getCause()); > Throwable cause = e.getCause(); > if (cause != null && cause instanceof IOException) { > throw (IOException) cause; > } > throw new RuntimeException(msg, e); > } catch (Exception e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e); > throw new RuntimeException(msg, e); > } > } > {code} > Could a potential fix me to perform a sync even on streams of type > *FSDataOutputStream*? > {code} > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } else if (os instanceof FSDataOutputStream) { > os.hsync(); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)