I think I understand. Once split, ranges are never merged. My question was about whether the runtime would ever merge two ranges again. It sounds like they can only be processed as a whole or further split.
Is it appropriate to return the same range from splitFunction() if it *cannot* be split further? Or is there some other indicator / return value we should use to indicate it can't be split? Peter On Tue, Apr 24, 2018 at 4:18 PM, Eugene Kirpichov <[email protected]> wrote: > I think the first question that has to be answered here is: Is it possible > *at all* to implement parallel reading of RFC 4180? > > I.e., given a start byte offset, is it possible to reliably locate the > first record boundary at or after that offset while scanning only a small > amount of data? > If it is possible, then that's what the SDF (or BoundedSource, etc.) > should do - split into blind byte ranges, and use this algorithm to assign > consistent meaning to byte ranges. > > To answer your questions 2 and 3: think of it this way. > The SDF's ProcessElement takes an element and a restriction. > ProcessElement must make only one promise: that it will correctly perform > exactly the work associated with this element and restriction. > The challenge is that the restriction can become smaller while > ProcessElement runs - in which case, ProcessElement must also do fewer > work. This can happen concurrently to ProcessElement running, so really the > guarantee should be rephrased as "By the time ProcessElement completes, it > should have performed exactly the work associated with the element and > tracker.currentRestriction() at the moment of completion". > > This is all that is asked of ProcessElement. If Beam decides to ask the > tracker to split itself into two ranges (making the current one - "primary" > - smaller, and producing an additional one - "residual"), Beam of course > takes the responsibility for executing the residual restriction somewhere > else: it won't be lost. > > E.g. if ProcessElement was invoked with [a, b), but while it was invoked > it was split into [a, b-100) and [b-100, b), then the current > ProcessElement call must process [a, b-100), and Beam guarantees that it > will fire up another ProcessElement call for [b-100, b) (Of course, both of > these calls may end up being recursively split further). > > I'm not quite sure what you mean by "recombining" - please let me know if > the explanation above makes things clear enough or not. > > On Tue, Apr 24, 2018 at 2:55 PM Peter Brumblay <[email protected]> > wrote: > >> Hi Eugene, thank you for the feedback! >> >> TextIO.read() can't handle RFC 4180 in full (at least I don't think it >> does!) - we have a lot of source data with embedded newlines. These >> records get split improperly because TextIO.read() blindly looks for >> newline characters. We need something which natively understands embedded >> newlines in quoted fields ... like so: >> >> foo,bar,"this has an\r\nembedded newline",192928\r\n >> >> As for the other feedback: >> >> 1. Claiming the entire range - yes, I figured this was a major mistake. >> Thanks for the confirmation. >> 2. The code for initial splitting of the restriction seems very complex... >> >> Follow-up question: if I process (and claim) only a subset of a range, >> say [a, b - 100), and [b - 100, b) represents an incomplete block, will >> beam SDF dynamically recombine ranges such that [b - 100, b + N) is sent to >> a worker with a (potentially) complete block? >> >> 3. Fine-tuning the evenness .... if beam SDF re-combines ranges for split >> blocks then it sounds like arbitrary splits in splitFunction() makes more >> sense. >> >> I'll try to take another pass at this with your feedback in mind. >> >> Peter >> >> >> >> On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov <[email protected]> >> wrote: >> >>> Hi Peter, >>> >>> Thanks for experimenting with SDF! However, in this particular case: any >>> reason why you can't just use TextIO.read() and parse each line as CSV? >>> Seems like that would require considerably less code. >>> >>> A few comments on this code per se: >>> - The ProcessElement implementation immediately claims the entire range, >>> which means that there can be no dynamic splitting and the code behaves >>> equivalently to a regular DoFn >>> - The code for initial splitting of the restriction seems very complex - >>> can you just split it blindly into a bunch of byte ranges of about equal >>> size? Looking at the actual data while splitting should be never necessary >>> - you should be able to just look at the file size (say, 100MB) and split >>> it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc. >>> - It seems that the splitting code tries to align splits with record >>> boundaries - this is not useful: it does not matter whether the split >>> boundaries fall onto record boundaries or not; instead, the reading code >>> should be able to read an arbitrary range of bytes in a meaningful way. >>> That typically means that reading [a, b) means "start at the first record >>> boundary located at or after "a", end at the first record boundary located >>> at or after "b"" >>> - Fine-tuning the evenness of initial splitting is also not useful: >>> dynamic splitting will even things out anyway; moreover, even if you are >>> able to achieve an equal amount of data read by different restrictions, it >>> does not translate into equal time to process the data with the ParDo's >>> fused into the same bundle (and that time is unpredictable). >>> >>> >>> On Tue, Apr 24, 2018 at 1:24 PM Peter Brumblay <[email protected]> >>> wrote: >>> >>>> Hi All, >>>> >>>> I noticed that there is no support for CSV file reading (e.g. rfc4180) >>>> in Apache Beam - at least no native transform. There's an issue to add this >>>> support: https://issues.apache.org/jira/browse/BEAM-51. >>>> >>>> I've seen examples which use the apache commons csv parser. I took a >>>> shot at implementing a SplittableDoFn transform. I have the full code and >>>> some questions in a gist here: https://gist.github.com/pbrumblay/ >>>> 9474dcc6cd238c3f1d26d869a20e863d. >>>> >>>> I suspect it could be improved quite a bit. If anyone has time to >>>> provide feedback I would really appreciate it. >>>> >>>> Regards, >>>> >>>> Peter Brumblay >>>> Fearless Technology Group, Inc. >>>> >>>> >>
