Hi Stephan, I figured as much, since 128k is a plit size that is not commonly used in large scale data processing engines. I will go for increasing the split size to reduce coordination overhead for Flink. It just so happened that my small toy example brought up the issue. Thanks for clearing this up.
Robert On Sun, Jul 12, 2015 at 9:21 PM, Stephan Ewen <se...@apache.org> wrote: > Hi Robert! > > I did some debugging and added some tests. Turns out, this is actually > expected behavior. It has to do with the splitting of the records. > > Because creating the splits happens without knowing the contents, the > split can be either in the middle of a record, or (by chance) exactly at > the boundary of a record. > > To make each split handle this consistently without knowing what the > others do, the contract is the following: > - Each but the first split skip initially over the records until the > first delimiter. > - Each split reads to the next delimiter beyond the split boundary. > > The case when the split size is 0 is the point when the split has to read > one more record (or complete the current record), so it gets one more chunk > of data. > > The problem in your case is actually that the split size is so low, that > the "read buffer to compete the current record" operation reads the split > twice. > > Can you reduce the buffer size to something that is reasonable? you can > also increase the split size. I think 128KB will result in high > coordination overhead for Flink, because these are distributed with RPC > messages from the master (1 message per split). > > Greetings, > Stephan > > > On Fri, Jul 10, 2015 at 6:55 PM, Stephan Ewen <se...@apache.org> wrote: > >> Hi Robert! >> >> This clearly sounds like unintended behavior. Thanks for reporting this. >> >> Apparently, the 0 line length was supposed to have a double meaning, but >> it goes haywire in this case. >> >> Let me try to come with a fix for this... >> >> Greetings, >> Stephan >> >> >> On Fri, Jul 10, 2015 at 6:05 PM, Robert Schmidtke <ro.schmid...@gmail.com >> > wrote: >> >>> Hey everyone, >>> >>> I just noticed that when processing input splits from a >>> DelimitedInputFormat (specifically, I have a text file with words in it), >>> that if the splitLength is 0, the entire readbuffer is filled (see >>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java#L577). >>> I'm using XtreemFS as underlying file system, which stripes files in blocks >>> of 128kb across storage servers. I have 8 physically separate nodes, and my >>> input file is 1MB, such that each node stores 128kb of data. This is >>> reported accurately to Flink (e.g. split sizes and hostnames). Now when the >>> splitLength is 0 at some point during processing (which it will become >>> eventually), the entire file is read in again, which kind of defeats the >>> point of processing a split of length 0. Is this intended behavior? I've >>> tried multiple hot-fixes, but they ended up in the file not bein read in >>> its entirety. I would like to know the rationale behind this >>> implementation, and maybe figure out a way around it. Thanks in advance, >>> >>> Robert >>> >>> -- >>> My GPG Key ID: 336E2680 >>> >> >> > -- My GPG Key ID: 336E2680