That makes sense... On Mon, Oct 26, 2015 at 12:31 PM, Márton Balassi <balassi.mar...@gmail.com> wrote:
> Hey Max, > > The solution I am proposing is not flushing on every record, but it makes > sure to forward the flushing from the sinkfunction to the outputformat > whenever it is triggered. Practically this means that the buffering is done > (almost) solely in the sink and not in the outputformat any more. > > On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels <m...@apache.org> > wrote: > >> Not sure whether we really want to flush at every invoke call. If you >> want to flush every time, you may want to set the update condition to 0 >> milliseconds. That way, flush will be called every time. In the API this is >> exposed by using the FileSinkFunctionByMillis. If you flush every time, >> performance might degrade. >> >> By the way, you may also use the RollingFileSink which splits the output >> into several files for each hour/week/day. You can then be sure those files >> are already completely written to HDFS. >> >> Best regards, >> Max >> >> On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi <balassi.mar...@gmail.com >> > wrote: >> >>> The problem persists in the current master, simply a format.flush() is >>> needed here [1]. I'll do a quick hotfix, thanks for the report again! >>> >>> [1] >>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99 >>> >>> On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi < >>> balassi.mar...@gmail.com> wrote: >>> >>>> Hey Rex, >>>> >>>> Writing half-baked records is definitely unwanted, thanks for spotting >>>> this. Most likely it can be solved by adding a flush at the end of every >>>> invoke call, let me check. >>>> >>>> Best, >>>> >>>> Marton >>>> >>>> On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <lungoth...@gmail.com> wrote: >>>> >>>>> Hi, flinkers! >>>>> >>>>> I'm new to this whole thing, >>>>> and it seems to me that >>>>> ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String, >>>>> WriteMode, long)' does not work properly. >>>>> To be specific, data were not flushed by update frequency when write >>>>> to HDFS. >>>>> >>>>> what make it more disturbing is that, if I check the content with >>>>> 'hdfs dfs -cat xxx', sometimes I got partial records. >>>>> >>>>> >>>>> I did a little digging in flink-0.9.1. >>>>> And it turns out all >>>>> 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)' >>>>> does >>>>> is pushing data to >>>>> 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream' >>>>> which is a delegate of 'org.apache.hadoop.fs.FSDataOutputStream'. >>>>> >>>>> In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never >>>>> flushed. >>>>> Which result in data being held in local buffer, and 'hdfs dfs -cat >>>>> xxx' might return partial records. >>>>> >>>>> >>>>> Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up >>>>> somewhere? >>>>> >>>>> >>>>> Best regards and thanks for your time! >>>>> >>>>> Rex >>>>> >>>> >>>> >>> >> >