Not sure whether we really want to flush at every invoke call. If you want
to flush every time, you may want to set the update condition to 0
milliseconds. That way, flush will be called every time. In the API this is
exposed by using the FileSinkFunctionByMillis. If you flush every time,
performance might degrade.

By the way, you may also use the RollingFileSink which splits the output
into several files for each hour/week/day. You can then be sure those files
are already completely written to HDFS.

Best regards,
Max

On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi <balassi.mar...@gmail.com>
wrote:

> The problem persists in the current master, simply a format.flush() is
> needed here [1]. I'll do a quick hotfix, thanks for the report again!
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99
>
> On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi <balassi.mar...@gmail.com>
> wrote:
>
>> Hey Rex,
>>
>> Writing half-baked records is definitely unwanted, thanks for spotting
>> this. Most likely it can be solved by adding a flush at the end of every
>> invoke call, let me check.
>>
>> Best,
>>
>> Marton
>>
>> On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <lungoth...@gmail.com> wrote:
>>
>>> Hi, flinkers!
>>>
>>> I'm new to this whole thing,
>>> and it seems to me that
>>> ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String,
>>> WriteMode, long)' does not work properly.
>>> To be specific, data were not flushed by update frequency when write to
>>> HDFS.
>>>
>>> what make it more disturbing is that, if I check the content with 'hdfs
>>> dfs -cat xxx', sometimes I got partial records.
>>>
>>>
>>> I did a little digging in flink-0.9.1.
>>> And it turns out all
>>> 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)'
>>> does
>>> is pushing data to
>>> 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
>>> which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.
>>>
>>> In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never
>>> flushed.
>>> Which result in data being held in local buffer, and 'hdfs dfs -cat xxx'
>>> might return partial records.
>>>
>>>
>>> Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up
>>> somewhere?
>>>
>>>
>>> Best regards and thanks for your time!
>>>
>>> Rex
>>>
>>
>>
>

Reply via email to