Robert - you're right, but this is a pathological case. It signals that there *might* be cases where we'll need to scan the whole file, however for practical purposes it's more important whether we need to scan the whole file in *all* (or most) cases - i.e. whether no amount of backward scanning of a non-pathological file can give us confidence that we're truly located a record boundary.
On Tue, Apr 24, 2018 at 3:21 PM Robert Bradshaw <[email protected]> wrote: > On Tue, Apr 24, 2018 at 3:18 PM Eugene Kirpichov <[email protected]> > wrote: > > > I think the first question that has to be answered here is: Is it > possible *at all* to implement parallel reading of RFC 4180? > > No. Consider a multi-record CSV file with no quotes. Placing a quote at the > start and end gives a new CSV file with exactly one element. > > > I.e., given a start byte offset, is it possible to reliably locate the > first record boundary at or after that offset while scanning only a small > amount of data? > > If it is possible, then that's what the SDF (or BoundedSource, etc.) > should do - split into blind byte ranges, and use this algorithm to assign > consistent meaning to byte ranges. > > > To answer your questions 2 and 3: think of it this way. > > The SDF's ProcessElement takes an element and a restriction. > > ProcessElement must make only one promise: that it will correctly perform > exactly the work associated with this element and restriction. > > The challenge is that the restriction can become smaller while > ProcessElement runs - in which case, ProcessElement must also do fewer > work. This can happen concurrently to ProcessElement running, so really the > guarantee should be rephrased as "By the time ProcessElement completes, it > should have performed exactly the work associated with the element and > tracker.currentRestriction() at the moment of completion". > > > This is all that is asked of ProcessElement. If Beam decides to ask the > tracker to split itself into two ranges (making the current one - "primary" > - smaller, and producing an additional one - "residual"), Beam of course > takes the responsibility for executing the residual restriction somewhere > else: it won't be lost. > > > E.g. if ProcessElement was invoked with [a, b), but while it was invoked > it was split into [a, b-100) and [b-100, b), then the current > ProcessElement call must process [a, b-100), and Beam guarantees that it > will fire up another ProcessElement call for [b-100, b) (Of course, both of > these calls may end up being recursively split further). > > > I'm not quite sure what you mean by "recombining" - please let me know if > the explanation above makes things clear enough or not. > > > On Tue, Apr 24, 2018 at 2:55 PM Peter Brumblay <[email protected] > > > wrote: > > >> Hi Eugene, thank you for the feedback! > > >> TextIO.read() can't handle RFC 4180 in full (at least I don't think it > does!) - we have a lot of source data with embedded newlines. These records > get split improperly because TextIO.read() blindly looks for newline > characters. We need something which natively understands embedded newlines > in quoted fields ... like so: > > >> foo,bar,"this has an\r\nembedded newline",192928\r\n > > >> As for the other feedback: > > >> 1. Claiming the entire range - yes, I figured this was a major mistake. > Thanks for the confirmation. > >> 2. The code for initial splitting of the restriction seems very > complex... > > >> Follow-up question: if I process (and claim) only a subset of a range, > say [a, b - 100), and [b - 100, b) represents an incomplete block, will > beam SDF dynamically recombine ranges such that [b - 100, b + N) is sent to > a worker with a (potentially) complete block? > > >> 3. Fine-tuning the evenness .... if beam SDF re-combines ranges for > split blocks then it sounds like arbitrary splits in splitFunction() makes > more sense. > > >> I'll try to take another pass at this with your feedback in mind. > > >> Peter > > > > >> On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov <[email protected] > > > wrote: > > >>> Hi Peter, > > >>> Thanks for experimenting with SDF! However, in this particular case: > any reason why you can't just use TextIO.read() and parse each line as CSV? > Seems like that would require considerably less code. > > >>> A few comments on this code per se: > >>> - The ProcessElement implementation immediately claims the entire > range, which means that there can be no dynamic splitting and the code > behaves equivalently to a regular DoFn > >>> - The code for initial splitting of the restriction seems very complex > - can you just split it blindly into a bunch of byte ranges of about equal > size? Looking at the actual data while splitting should be never necessary > - you should be able to just look at the file size (say, 100MB) and split > it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc. > >>> - It seems that the splitting code tries to align splits with record > boundaries - this is not useful: it does not matter whether the split > boundaries fall onto record boundaries or not; instead, the reading code > should be able to read an arbitrary range of bytes in a meaningful way. > That typically means that reading [a, b) means "start at the first record > boundary located at or after "a", end at the first record boundary located > at or after "b"" > >>> - Fine-tuning the evenness of initial splitting is also not useful: > dynamic splitting will even things out anyway; moreover, even if you are > able to achieve an equal amount of data read by different restrictions, it > does not translate into equal time to process the data with the ParDo's > fused into the same bundle (and that time is unpredictable). > > > >>> On Tue, Apr 24, 2018 at 1:24 PM Peter Brumblay > >>> <[email protected]> > wrote: > > >>>> Hi All, > > >>>> I noticed that there is no support for CSV file reading (e.g. rfc4180) > in Apache Beam - at least no native transform. There's an issue to add this > support: https://issues.apache.org/jira/browse/BEAM-51. > > >>>> I've seen examples which use the apache commons csv parser. I took a > shot at implementing a SplittableDoFn transform. I have the full code and > some questions in a gist here: > https://gist.github.com/pbrumblay/9474dcc6cd238c3f1d26d869a20e863d. > > >>>> I suspect it could be improved quite a bit. If anyone has time to > provide feedback I would really appreciate it. > > >>>> Regards, > > >>>> Peter Brumblay > >>>> Fearless Technology Group, Inc. >
