This blog post was an excellent find. If I had infinite time I'd take a
stab at implementing this. They basically outline an algorithm which
*might* be appropriate for a generalized solution. It certainly beats my
"try to parse 3 records and if you do pretend you're good" method.

Peter

On Tue, Apr 24, 2018 at 4:46 PM, Eugene Kirpichov <[email protected]>
wrote:

> Actually, you're right, this is not a pathological case. If we take a
> regular 1TB-sized CSV file that actually doesn't have any quotes, and start
> looking somewhere in the middle of it, there is no way to know whether
> we're currently inside or outside quotes without scanning the whole file -
> in theory there might be a quote lurking a few GB back. I suppose this can
> be addressed with specifying limits on field sizes in bytes: e.g. with a
> limit of 1kb, if there's no quotes in the preceding 1kb, then we're
> definitely in an unquoted context. However, if there is a quote, it may be
> either opening or closing the quoted context. There might be some way to
> resolve the ambiguity, https://blog.etleap.com/2016/
> 11/27/distributed-csv-parsing/ seems to discuss this in detail.
>
> On Tue, Apr 24, 2018 at 3:26 PM Eugene Kirpichov <[email protected]>
> wrote:
>
>> Robert - you're right, but this is a pathological case. It signals that
>> there *might* be cases where we'll need to scan the whole file, however for
>> practical purposes it's more important whether we need to scan the whole
>> file in *all* (or most) cases - i.e. whether no amount of backward scanning
>> of a non-pathological file can give us confidence that we're truly located
>> a record boundary.
>>
>> On Tue, Apr 24, 2018 at 3:21 PM Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> On Tue, Apr 24, 2018 at 3:18 PM Eugene Kirpichov <[email protected]>
>>> wrote:
>>>
>>> > I think the first question that has to be answered here is: Is it
>>> possible *at all* to implement parallel reading of RFC 4180?
>>>
>>> No. Consider a multi-record CSV file with no quotes. Placing a quote at
>>> the
>>> start and end gives a new CSV file with exactly one element.
>>>
>>> > I.e., given a start byte offset, is it possible to reliably locate the
>>> first record boundary at or after that offset while scanning only a small
>>> amount of data?
>>> > If it is possible, then that's what the SDF (or BoundedSource, etc.)
>>> should do - split into blind byte ranges, and use this algorithm to
>>> assign
>>> consistent meaning to byte ranges.
>>>
>>> > To answer your questions 2 and 3: think of it this way.
>>> > The SDF's ProcessElement takes an element and a restriction.
>>> > ProcessElement must make only one promise: that it will correctly
>>> perform
>>> exactly the work associated with this element and restriction.
>>> > The challenge is that the restriction can become smaller while
>>> ProcessElement runs - in which case, ProcessElement must also do fewer
>>> work. This can happen concurrently to ProcessElement running, so really
>>> the
>>> guarantee should be rephrased as "By the time ProcessElement completes,
>>> it
>>> should have performed exactly the work associated with the element and
>>> tracker.currentRestriction() at the moment of completion".
>>>
>>> > This is all that is asked of ProcessElement. If Beam decides to ask the
>>> tracker to split itself into two ranges (making the current one -
>>> "primary"
>>> - smaller, and producing an additional one - "residual"), Beam of course
>>> takes the responsibility for executing the residual restriction somewhere
>>> else: it won't be lost.
>>>
>>> > E.g. if ProcessElement was invoked with [a, b), but while it was
>>> invoked
>>> it was split into [a, b-100) and [b-100, b), then the current
>>> ProcessElement call must process [a, b-100), and Beam guarantees that it
>>> will fire up another ProcessElement call for [b-100, b) (Of course, both
>>> of
>>> these calls may end up being recursively split further).
>>>
>>> > I'm not quite sure what you mean by "recombining" - please let me know
>>> if
>>> the explanation above makes things clear enough or not.
>>>
>>> > On Tue, Apr 24, 2018 at 2:55 PM Peter Brumblay <
>>> [email protected]>
>>> wrote:
>>>
>>> >> Hi Eugene, thank you for the feedback!
>>>
>>> >> TextIO.read() can't handle RFC 4180 in full (at least I don't think it
>>> does!) - we have a lot of source data with embedded newlines. These
>>> records
>>> get split improperly because TextIO.read() blindly looks for newline
>>> characters. We need something which natively understands embedded
>>> newlines
>>> in quoted fields ... like so:
>>>
>>> >> foo,bar,"this has an\r\nembedded newline",192928\r\n
>>>
>>> >> As for the other feedback:
>>>
>>> >> 1. Claiming the entire range - yes, I figured this was a major
>>> mistake.
>>> Thanks for the confirmation.
>>> >> 2. The code for initial splitting of the restriction seems very
>>> complex...
>>>
>>> >> Follow-up question: if I process (and claim) only a subset of a range,
>>> say [a, b - 100), and [b - 100, b) represents an incomplete block, will
>>> beam SDF dynamically recombine ranges such that [b - 100, b + N) is sent
>>> to
>>> a worker with a (potentially) complete block?
>>>
>>> >> 3. Fine-tuning the evenness .... if beam SDF re-combines ranges for
>>> split blocks then it sounds like arbitrary splits in splitFunction()
>>> makes
>>> more sense.
>>>
>>> >> I'll try to take another pass at this with your feedback in mind.
>>>
>>> >> Peter
>>>
>>>
>>>
>>> >> On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov <
>>> [email protected]>
>>> wrote:
>>>
>>> >>> Hi Peter,
>>>
>>> >>> Thanks for experimenting with SDF! However, in this particular case:
>>> any reason why you can't just use TextIO.read() and parse each line as
>>> CSV?
>>> Seems like that would require considerably less code.
>>>
>>> >>> A few comments on this code per se:
>>> >>> - The ProcessElement implementation immediately claims the entire
>>> range, which means that there can be no dynamic splitting and the code
>>> behaves equivalently to a regular DoFn
>>> >>> - The code for initial splitting of the restriction seems very
>>> complex
>>> - can you just split it blindly into a bunch of byte ranges of about
>>> equal
>>> size? Looking at the actual data while splitting should be never
>>> necessary
>>> - you should be able to just look at the file size (say, 100MB) and split
>>> it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc.
>>> >>> - It seems that the splitting code tries to align splits with record
>>> boundaries - this is not useful: it does not matter whether the split
>>> boundaries fall onto record boundaries or not; instead, the reading code
>>> should be able to read an arbitrary range of bytes in a meaningful way.
>>> That typically means that reading [a, b) means "start at the first record
>>> boundary located at or after "a", end at the first record boundary
>>> located
>>> at or after "b""
>>> >>> - Fine-tuning the evenness of initial splitting is also not useful:
>>> dynamic splitting will even things out anyway; moreover, even if you are
>>> able to achieve an equal amount of data read by different restrictions,
>>> it
>>> does not translate into equal time to process the data with the ParDo's
>>> fused into the same bundle (and that time is unpredictable).
>>>
>>>
>>> >>> On Tue, Apr 24, 2018 at 1:24 PM Peter Brumblay
>>> >>> <[email protected]>
>>> wrote:
>>>
>>> >>>> Hi All,
>>>
>>> >>>> I noticed that there is no support for CSV file reading (e.g.
>>> rfc4180)
>>> in Apache Beam - at least no native transform. There's an issue to add
>>> this
>>> support: https://issues.apache.org/jira/browse/BEAM-51.
>>>
>>> >>>> I've seen examples which use the apache commons csv parser. I took a
>>> shot at implementing a SplittableDoFn transform. I have the full code and
>>> some questions in a gist here:
>>> https://gist.github.com/pbrumblay/9474dcc6cd238c3f1d26d869a20e863d.
>>>
>>> >>>> I suspect it could be improved quite a bit. If anyone has time to
>>> provide feedback I would really appreciate it.
>>>
>>> >>>> Regards,
>>>
>>> >>>> Peter Brumblay
>>> >>>> Fearless Technology Group, Inc.
>>>
>>

Reply via email to