That makes sense...

On Mon, Oct 26, 2015 at 12:31 PM, Márton Balassi <balassi.mar...@gmail.com>
wrote:

> Hey Max,
>
> The solution I am proposing is not flushing on every record, but it makes
> sure to forward the flushing from the sinkfunction to the outputformat
> whenever it is triggered. Practically this means that the buffering is done
> (almost) solely in the sink and not in the outputformat any more.
>
> On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> Not sure whether we really want to flush at every invoke call. If you
>> want to flush every time, you may want to set the update condition to 0
>> milliseconds. That way, flush will be called every time. In the API this is
>> exposed by using the FileSinkFunctionByMillis. If you flush every time,
>> performance might degrade.
>>
>> By the way, you may also use the RollingFileSink which splits the output
>> into several files for each hour/week/day. You can then be sure those files
>> are already completely written to HDFS.
>>
>> Best regards,
>> Max
>>
>> On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi <balassi.mar...@gmail.com
>> > wrote:
>>
>>> The problem persists in the current master, simply a format.flush() is
>>> needed here [1]. I'll do a quick hotfix, thanks for the report again!
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99
>>>
>>> On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi <
>>> balassi.mar...@gmail.com> wrote:
>>>
>>>> Hey Rex,
>>>>
>>>> Writing half-baked records is definitely unwanted, thanks for spotting
>>>> this. Most likely it can be solved by adding a flush at the end of every
>>>> invoke call, let me check.
>>>>
>>>> Best,
>>>>
>>>> Marton
>>>>
>>>> On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <lungoth...@gmail.com> wrote:
>>>>
>>>>> Hi, flinkers!
>>>>>
>>>>> I'm new to this whole thing,
>>>>> and it seems to me that
>>>>> ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String,
>>>>> WriteMode, long)' does not work properly.
>>>>> To be specific, data were not flushed by update frequency when write
>>>>> to HDFS.
>>>>>
>>>>> what make it more disturbing is that, if I check the content with
>>>>> 'hdfs dfs -cat xxx', sometimes I got partial records.
>>>>>
>>>>>
>>>>> I did a little digging in flink-0.9.1.
>>>>> And it turns out all
>>>>> 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)'
>>>>> does
>>>>> is pushing data to
>>>>> 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
>>>>> which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.
>>>>>
>>>>> In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never
>>>>> flushed.
>>>>> Which result in data being held in local buffer, and 'hdfs dfs -cat
>>>>> xxx' might return partial records.
>>>>>
>>>>>
>>>>> Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up
>>>>> somewhere?
>>>>>
>>>>>
>>>>> Best regards and thanks for your time!
>>>>>
>>>>> Rex
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to