[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16189049#comment-16189049
 ] 

Vijay Srinivasaraghavan commented on FLINK-7737:
------------------------------------------------

Some observation with respect to the usage of hflush vs hsync. When using HCFS 
implementation as backed filesystem, only hflush() is invoked since call to 
hsync() happens only when the FSDataOutputStream is instance of 
HdfsDataOutputStream. Due to this fact, we are seeing some data loss when the 
bucketing sink is holding data in pending state and trying to close the stream 
(as part of TM failover recovery).

I do not see any issue in adding another condition to include hsync() call for 
HCFS types (FSDataOutputStream). 

[~rmetzger] Could you please take a look?

hflush() - This API flushes all outstanding data (i.e. the current unfinished 
packet) from the client into the OS buffers on all DataNode replicas.

hsync() - This API flushes the data to the DataNodes, like hflush(), but should 
also force the data to underlying physical storage via fsync (or equivalent).

https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7737
>                 URL: https://issues.apache.org/jira/browse/FLINK-7737
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.3.2
>         Environment: Dev
>            Reporter: Ryan Hobbs
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>                               ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>                       }
>               } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>                       }
> throw new RuntimeException(msg, e);
>               } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>               }
>       }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
>                                 ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>                         } else if (os instanceof FSDataOutputStream) {
>                                 os.hsync();
>                         }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to