Hey Max, The solution I am proposing is not flushing on every record, but it makes sure to forward the flushing from the sinkfunction to the outputformat whenever it is triggered. Practically this means that the buffering is done (almost) solely in the sink and not in the outputformat any more.
On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels <m...@apache.org> wrote: > Not sure whether we really want to flush at every invoke call. If you want > to flush every time, you may want to set the update condition to 0 > milliseconds. That way, flush will be called every time. In the API this is > exposed by using the FileSinkFunctionByMillis. If you flush every time, > performance might degrade. > > By the way, you may also use the RollingFileSink which splits the output > into several files for each hour/week/day. You can then be sure those files > are already completely written to HDFS. > > Best regards, > Max > > On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi <balassi.mar...@gmail.com> > wrote: > >> The problem persists in the current master, simply a format.flush() is >> needed here [1]. I'll do a quick hotfix, thanks for the report again! >> >> [1] >> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99 >> >> On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi <balassi.mar...@gmail.com >> > wrote: >> >>> Hey Rex, >>> >>> Writing half-baked records is definitely unwanted, thanks for spotting >>> this. Most likely it can be solved by adding a flush at the end of every >>> invoke call, let me check. >>> >>> Best, >>> >>> Marton >>> >>> On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <lungoth...@gmail.com> wrote: >>> >>>> Hi, flinkers! >>>> >>>> I'm new to this whole thing, >>>> and it seems to me that >>>> ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String, >>>> WriteMode, long)' does not work properly. >>>> To be specific, data were not flushed by update frequency when write to >>>> HDFS. >>>> >>>> what make it more disturbing is that, if I check the content with 'hdfs >>>> dfs -cat xxx', sometimes I got partial records. >>>> >>>> >>>> I did a little digging in flink-0.9.1. >>>> And it turns out all >>>> 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)' >>>> does >>>> is pushing data to >>>> 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream' >>>> which is a delegate of 'org.apache.hadoop.fs.FSDataOutputStream'. >>>> >>>> In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never >>>> flushed. >>>> Which result in data being held in local buffer, and 'hdfs dfs -cat >>>> xxx' might return partial records. >>>> >>>> >>>> Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up >>>> somewhere? >>>> >>>> >>>> Best regards and thanks for your time! >>>> >>>> Rex >>>> >>> >>> >> >