Hi Robert!

I did some debugging and added some tests. Turns out, this is actually
expected behavior. It has to do with the splitting of the records.

Because creating the splits happens without knowing the contents, the split
can be either in the middle of a record, or (by chance) exactly at the
boundary of a record.

To make each split handle this consistently without knowing what the others
do, the contract is the following:
 - Each but the first split skip initially over the records until the first
delimiter.
 - Each split reads to the next delimiter beyond the split boundary.

The case when the split size is 0 is the point when the split has to read
one more record (or complete the current record), so it gets one more chunk
of data.

The problem in your case is actually that the split size is so low, that
the "read buffer to compete the current record" operation reads the split
twice.

Can you reduce the buffer size to something that is reasonable? you can
also increase the split size. I think 128KB will result in high
coordination overhead for Flink, because these are distributed with RPC
messages from the master (1 message per split).

Greetings,
Stephan


On Fri, Jul 10, 2015 at 6:55 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Robert!
>
> This clearly sounds like unintended behavior. Thanks for reporting this.
>
> Apparently, the 0 line length was supposed to have a double meaning, but
> it goes haywire in this case.
>
> Let me try to come with a fix for this...
>
> Greetings,
> Stephan
>
>
> On Fri, Jul 10, 2015 at 6:05 PM, Robert Schmidtke <ro.schmid...@gmail.com>
> wrote:
>
>> Hey everyone,
>>
>> I just noticed that when processing input splits from a
>> DelimitedInputFormat (specifically, I have a text file with words in it),
>> that if the splitLength is 0, the entire readbuffer is filled (see
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java#L577).
>> I'm using XtreemFS as underlying file system, which stripes files in blocks
>> of 128kb across storage servers. I have 8 physically separate nodes, and my
>> input file is 1MB, such that each node stores 128kb of data. This is
>> reported accurately to Flink (e.g. split sizes and hostnames). Now when the
>> splitLength is 0 at some point during processing (which it will become
>> eventually), the entire file is read in again, which kind of defeats the
>> point of processing a split of length 0. Is this intended behavior? I've
>> tried multiple hot-fixes, but they ended up in the file not bein read in
>> its entirety. I would like to know the rationale behind this
>> implementation, and maybe figure out a way around it. Thanks in advance,
>>
>> Robert
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>

Reply via email to