The problem persists in the current master, simply a format.flush() is
needed here [1]. I'll do a quick hotfix, thanks for the report again!

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99

On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi <balassi.mar...@gmail.com>
wrote:

> Hey Rex,
>
> Writing half-baked records is definitely unwanted, thanks for spotting
> this. Most likely it can be solved by adding a flush at the end of every
> invoke call, let me check.
>
> Best,
>
> Marton
>
> On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <lungoth...@gmail.com> wrote:
>
>> Hi, flinkers!
>>
>> I'm new to this whole thing,
>> and it seems to me that
>> ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String,
>> WriteMode, long)' does not work properly.
>> To be specific, data were not flushed by update frequency when write to
>> HDFS.
>>
>> what make it more disturbing is that, if I check the content with 'hdfs
>> dfs -cat xxx', sometimes I got partial records.
>>
>>
>> I did a little digging in flink-0.9.1.
>> And it turns out all
>> 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)'
>> does
>> is pushing data to
>> 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
>> which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.
>>
>> In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never
>> flushed.
>> Which result in data being held in local buffer, and 'hdfs dfs -cat xxx'
>> might return partial records.
>>
>>
>> Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up
>> somewhere?
>>
>>
>> Best regards and thanks for your time!
>>
>> Rex
>>
>
>

Reply via email to