On Wed, Oct 11, 2023 at 6:55 AM Kenneth Knowles <k...@apache.org> wrote:

> So, top-posting because the threading got to be a lot for me and I think
> it forked a bit too... I may even be restating something someone said, so
> apologies for that.
>
> Very very good point about *required* parameters where if you don't use
> them then you will end up with two writers writing to the same file. The
> easiest example to work with might be if you omitted SHARD_NUM so all
> shards end up clobbering the same file.
>
> I think there's a unifying perspective between prefix/suffix and the need
> to be sure to include critical sharding variables. Essentially it is my
> point about it being a "big data fileset". It is perhaps unrealistic but
> ideally the user names the big data fileset and then the mandatory other
> pieces are added outside of their control. For example if I name my big
> data fileset "foo" then that implicitly means that "foo" consists of all
> the files named "foo/${SHARD_NUM}-of-${SHARD_TOTAL}". And yes now that I
> re-read I see you basically said the same thing. In some cases the required
> fields will include $WINDOW, $KEY, and $PANE_INDEX, yes? Even though the
> user can think of it as a textual template, if we can use a library that
> yields an abstract syntax tree for the expression we can easily check these
> requirements in a robust way - or we could do it in a non-robust way be
> string-scraping ourselves.
>

Yes. I think we are talking about the same thing. Users should not have
full control over the filename since that could lead to conflicts and data
loss when data is being written in parallel from multiple workers. Users
can refer to the big data fileset being written using the glob "<path>/**".
In addition users have control over the filename <prefix> and <suffix>
(file extension, for example) which can be useful for some downstream
use-cases. Rest of the filename will be filled out by the SDK (window, pane
etc.) to make sure that the files written by different workers do not
conflict.

Thanks,
Cham


>
> We actually are very close to this in FileIO. I think the interpretation
> of "prefix" is that it is the filename "foo" as above, and "suffix" is
> really something like ".txt" that you stick on the end of everything for
> whatever reason.
>
> Kenn
>
> On Tue, Oct 10, 2023 at 7:12 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>>
>>> On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>>
>>>>>
>>>>> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> I suspect some simple pattern templating would solve most use cases.
>>>>>> We probably would want to support timestamp formatting (e.g. $YYYY $M $D)
>>>>>> as well.
>>>>>>
>>>>>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw <rober...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath <
>>>>>>> chamik...@google.com> wrote:
>>>>>>>
>>>>>>>> I would say:
>>>>>>>>
>>>>>>>>     sink:
>>>>>>>>       type: WriteToParquet
>>>>>>>>       config:
>>>>>>>>         path: /beam/filesytem/dest
>>>>>>>>         prefix: <my prefix>
>>>>>>>>         suffix: <my suffix>
>>>>>>>>
>>>>>>>> Underlying SDK will add the middle part of the file names to make
>>>>>>>> sure that files generated by various bundles/windows/shards do not 
>>>>>>>> conflict.
>>>>>>>>
>>>>>>>
>>>>>>> What's the relationship between path and prefix? Is path the
>>>>>>> directory part of the full path, or does prefix precede it?
>>>>>>>
>>>>>>
>>>>> prefix would be the first part of the file name so each shard will be
>>>>> named.
>>>>> <path>/<prefix>-<shard components added by the runner>-<suffix>
>>>>>
>>>>> This is similar to what we do in existing SDKS. For example, Java
>>>>> FileIO,
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187
>>>>>
>>>>
>>>> Yeah, although there's no distinction between path and prefix.
>>>>
>>>
>>> Ah, for FIleIO, path comes from the "to" call.
>>>
>>>
>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1125
>>>
>>>
>>
>> Ah. I guess there's some inconsistency here, e.g. text files are written
>> to a filenamePrefix that subsumes both:
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html#to-java.lang.String-
>>
>>
>>>
>>>>
>>>>>>>
>>>>>>>> This will satisfy the vast majority of use-cases I believe. Fully
>>>>>>>> customizing the file pattern sounds like a more advanced use case that 
>>>>>>>> can
>>>>>>>> be left for "real" SDKs.
>>>>>>>>
>>>>>>>
>>>>>>> Yea, we don't have to do everything.
>>>>>>>
>>>>>>>
>>>>>>>> For dynamic destinations, I think just making the "path" component
>>>>>>>> support  a lambda that is parameterized by the input should be adequate
>>>>>>>> since this allows customers to direct files written to different
>>>>>>>> destination directories.
>>>>>>>>
>>>>>>>>     sink:
>>>>>>>>       type: WriteToParquet
>>>>>>>>       config:
>>>>>>>>         path: <destination lambda>
>>>>>>>>         prefix: <my prefix>
>>>>>>>>         suffix: <my suffix>
>>>>>>>>
>>>>>>>> I'm not sure what would be the best way to specify a lambda here
>>>>>>>> though. Maybe a regex or the name of a Python callable ?
>>>>>>>>
>>>>>>>
>>>>>>> I'd rather not require Python for a pure Java pipeline, but some
>>>>>>> kind of a pattern template may be sufficient here.
>>>>>>>
>>>>>>>
>>>>>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Just FYI - the reason why names (including prefixes) in
>>>>>>>>>> DynamicDestinations were parameterized via a lambda instead of just 
>>>>>>>>>> having
>>>>>>>>>> the user add it via MapElements is performance. We discussed 
>>>>>>>>>> something
>>>>>>>>>> along the lines of what you are suggesting (essentially having the 
>>>>>>>>>> user
>>>>>>>>>> create a KV where the key contained the dynamic information). The 
>>>>>>>>>> problem
>>>>>>>>>> was that often the size of the generated filepath was often much 
>>>>>>>>>> larger
>>>>>>>>>> (sometimes by 2 OOM) than the information in the record, and there 
>>>>>>>>>> was a
>>>>>>>>>> desire to avoid record blowup. e.g. the record might contain a single
>>>>>>>>>> integer userid, and the filepath prefix would then be
>>>>>>>>>> /long/path/to/output/users/<id>. This was especially bad in cases 
>>>>>>>>>> where the
>>>>>>>>>> data had to be shuffled, and the existing dynamic destinations method
>>>>>>>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> That is a consideration I hadn't thought much of, thanks for
>>>>>>>>> bringing this up.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Now there may not be any good way to keep this benefit in a
>>>>>>>>>> declarative approach such as YAML (or at least a good easy way - we 
>>>>>>>>>> could
>>>>>>>>>> always allow the user to pass in a SQL expression to extract the 
>>>>>>>>>> filename
>>>>>>>>>> from the record!), but we should keep in mind that this might mean 
>>>>>>>>>> that
>>>>>>>>>> YAML-generated pipelines will be less efficient for certain use 
>>>>>>>>>> cases.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yep, it's not as straightforward to do in a declarative way. I
>>>>>>>>> would like to avoid mixing UDFs (with their associated languages and
>>>>>>>>> execution environments) if possible. Though I'd like the performance 
>>>>>>>>> of a
>>>>>>>>> "straightforward" YAML pipeline to be that which one can get writing
>>>>>>>>> straight-line Java (and possibly better, if we can leverage the 
>>>>>>>>> structure
>>>>>>>>> of schemas everywhere) this is not an absolute requirement for all
>>>>>>>>> features.
>>>>>>>>>
>>>>>>>>> I wonder if separating out a constant prefix vs. the dynamic stuff
>>>>>>>>> could be sufficient to mitigate the blow-up of pre-computing this in 
>>>>>>>>> most
>>>>>>>>> cases (especially in the context of a larger pipeline). Alternatively,
>>>>>>>>> rather than just a sharding pattern, one could have a full 
>>>>>>>>> filepattern that
>>>>>>>>> includes format parameters for dynamically computed bits as well as 
>>>>>>>>> the
>>>>>>>>> shard number, windowing info, etc. (There are pros and cons to this.)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <
>>>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Currently the various file writing configurations take a single
>>>>>>>>>>> parameter, path, which indicates where the (sharded) output should 
>>>>>>>>>>> be
>>>>>>>>>>> placed. In other words, one can write something like
>>>>>>>>>>>
>>>>>>>>>>>   pipeline:
>>>>>>>>>>>     ...
>>>>>>>>>>>     sink:
>>>>>>>>>>>       type: WriteToParquet
>>>>>>>>>>>       config:
>>>>>>>>>>>         path: /beam/filesytem/dest
>>>>>>>>>>>
>>>>>>>>>>> and one gets files like "/beam/filesystem/dest-X-of-N"
>>>>>>>>>>>
>>>>>>>>>>> Of course, in practice file writing is often much more
>>>>>>>>>>> complicated than this (especially when it comes to Streaming). For
>>>>>>>>>>> reference, I've included links to our existing offerings in the 
>>>>>>>>>>> various
>>>>>>>>>>> SDKs below. I'd like to start a discussion about what else should 
>>>>>>>>>>> go in the
>>>>>>>>>>> "config" parameter and how it should be expressed in YAML.
>>>>>>>>>>>
>>>>>>>>>>> The primary concern is around naming. This can generally be
>>>>>>>>>>> split into (1) the prefix, which must be provided by the users (2) 
>>>>>>>>>>> the
>>>>>>>>>>> sharing information, includes both shard counts (e.g. (the -X-of-N 
>>>>>>>>>>> suffix)
>>>>>>>>>>> but also windowing information (for streaming pipelines) which we 
>>>>>>>>>>> may want
>>>>>>>>>>> to allow the user to customize the formatting of, and (3) a suffix 
>>>>>>>>>>> like
>>>>>>>>>>> .json or .avro that is useful for both humans and tooling and can 
>>>>>>>>>>> often be
>>>>>>>>>>> inferred but should allow customization as well.
>>>>>>>>>>>
>>>>>>>>>>> An interesting case is that of dynamic destinations, where the
>>>>>>>>>>> prefix (or other parameters) may themselves be functions of the 
>>>>>>>>>>> records
>>>>>>>>>>> themselves. (I am excluding the case where the format itself is
>>>>>>>>>>> variable--such cases are probably better handled by explicitly 
>>>>>>>>>>> partitioning
>>>>>>>>>>> the data and doing multiple writes, as this introduces significant
>>>>>>>>>>> complexities and the set of possible formats is generally finite 
>>>>>>>>>>> and known
>>>>>>>>>>> ahead of time.) I propose that we leverage the fact that we have 
>>>>>>>>>>> structured
>>>>>>>>>>> data to be able to pull out these dynamic parameters. For example, 
>>>>>>>>>>> if we
>>>>>>>>>>> have an input data set with a string column my_col we could allow 
>>>>>>>>>>> something
>>>>>>>>>>> like
>>>>>>>>>>>
>>>>>>>>>>>   config:
>>>>>>>>>>>     path: {dynamic: my_col}
>>>>>>>>>>>
>>>>>>>>>>> which would pull this information out at runtime. (With the
>>>>>>>>>>> MapToFields transform, it is very easy to compute/append additional 
>>>>>>>>>>> fields
>>>>>>>>>>> to existing records.) Generally this field would then be stripped 
>>>>>>>>>>> from the
>>>>>>>>>>> written data, which would only see the subset of non-dynamically 
>>>>>>>>>>> referenced
>>>>>>>>>>> columns (though this could be configurable: we could add an 
>>>>>>>>>>> attribute like
>>>>>>>>>>> {dynamic: my_col, Keep: true} or require the set of columns to be 
>>>>>>>>>>> actually
>>>>>>>>>>> written (or elided) to be enumerated in the config or allow/require 
>>>>>>>>>>> the
>>>>>>>>>>> actual data to be written to be in a designated field of the "full" 
>>>>>>>>>>> input
>>>>>>>>>>> records as arranged by a preceding transform). It'd be great to get
>>>>>>>>>>> input/impressions from a wide range of people here on what would be 
>>>>>>>>>>> the
>>>>>>>>>>> most natural. Often just writing out snippets of various 
>>>>>>>>>>> alternatives can
>>>>>>>>>>> be quite informative (though I'm avoiding putting them here for the 
>>>>>>>>>>> moment
>>>>>>>>>>> to avoid biasing ideas right off the bat).
>>>>>>>>>>>
>>>>>>>>>>> For streaming pipelines it is often essential to write data out
>>>>>>>>>>> in a time-partitioned manner. The typical way to do this is to add 
>>>>>>>>>>> the
>>>>>>>>>>> windowing information into the shard specification itself, and a 
>>>>>>>>>>> (set of)
>>>>>>>>>>> file(s) is written on each window closing. Beam YAML already 
>>>>>>>>>>> supports any
>>>>>>>>>>> transform being given a "windowing" configuration which will cause a
>>>>>>>>>>> WindowInto transform to be applied to its input(s) before 
>>>>>>>>>>> application which
>>>>>>>>>>> can sit naturally on a sink. We may want to consider if 
>>>>>>>>>>> non-windowed writes
>>>>>>>>>>> make sense as well (though how this interacts with the watermark and
>>>>>>>>>>> underlying implementations are a large open question, so this is a 
>>>>>>>>>>> larger
>>>>>>>>>>> change that might make sense to defer).
>>>>>>>>>>>
>>>>>>>>>>> Note that I am explicitly excluding "coders" here. All data in
>>>>>>>>>>> YAML should be schema'd, and writers should know how to write this
>>>>>>>>>>> structured data. We may want to allow a "schema" field to allow a 
>>>>>>>>>>> user to
>>>>>>>>>>> specify the desired schema in a manner compatible with the sink 
>>>>>>>>>>> format
>>>>>>>>>>> itself (e.g. avro, json, whatever) that could be used both for 
>>>>>>>>>>> validation
>>>>>>>>>>> and possibly resolving ambiguities (e.g. if the sink has an enum 
>>>>>>>>>>> format
>>>>>>>>>>> that is not expressed in the schema of the input PCollection).
>>>>>>>>>>>
>>>>>>>>>>> Some other configuration options are that some formats
>>>>>>>>>>> (especially text-based ones) allow for specification of an external
>>>>>>>>>>> compression type (which may be inferable from the suffix), whether 
>>>>>>>>>>> to write
>>>>>>>>>>> a single shard if the input collection is empty or no shards at all 
>>>>>>>>>>> (an
>>>>>>>>>>> occasional user request that's supported for some Beam sinks now), 
>>>>>>>>>>> whether
>>>>>>>>>>> to allow fixed sharing (generally discouraged, as it disables 
>>>>>>>>>>> things like
>>>>>>>>>>> automatic shading based on input size, let alone dynamic work 
>>>>>>>>>>> rebalancing,
>>>>>>>>>>> though sometimes this is useful if the input is known to be small 
>>>>>>>>>>> and a
>>>>>>>>>>> single output is desired regardless of the restriction in 
>>>>>>>>>>> parallelism), or
>>>>>>>>>>> other sharding parameters (e.g. limiting the number of total 
>>>>>>>>>>> elements or
>>>>>>>>>>> (approximately) total number of bytes per output shard). Some of 
>>>>>>>>>>> these
>>>>>>>>>>> options may not be available/implemented for all 
>>>>>>>>>>> formats--consideration
>>>>>>>>>>> should be given as to how to handle this inconsistency (runtime 
>>>>>>>>>>> errors for
>>>>>>>>>>> unsupported combinations or simply not allowing them on any until 
>>>>>>>>>>> all are
>>>>>>>>>>> supported).
>>>>>>>>>>>
>>>>>>>>>>> A final consideration: we do not anticipate exposing the full
>>>>>>>>>>> complexity of Beam in the YAML offering. For advanced users using a 
>>>>>>>>>>> "real"
>>>>>>>>>>> SDK will often be preferable, and we intend to provide a migration 
>>>>>>>>>>> path
>>>>>>>>>>> from YAML to a language of your choice (codegen) as a migration 
>>>>>>>>>>> path. So we
>>>>>>>>>>> should balance simplicity with completeness and utility here.
>>>>>>>>>>>
>>>>>>>>>>> Sure, we could just pick something, but given that the
>>>>>>>>>>> main point of YAML is not capability, but expressibility and 
>>>>>>>>>>> ease-of-use, I
>>>>>>>>>>> think it's worth trying to get the expression of these concepts 
>>>>>>>>>>> right. I'm
>>>>>>>>>>> sure many of you have written a pipeline to files at some point in 
>>>>>>>>>>> time;
>>>>>>>>>>> I'd welcome any thoughts anyone has on the matter.
>>>>>>>>>>>
>>>>>>>>>>> - Robert
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> P.S. A related consideration: how should we consider the plain
>>>>>>>>>>> Read (where that file pattern is given at pipeline construction) 
>>>>>>>>>>> from the
>>>>>>>>>>> ReadAll variants? Should they be separate transforms, or should we 
>>>>>>>>>>> instead
>>>>>>>>>>> allow the same named transform (e.g. ReadFromParquet) support both 
>>>>>>>>>>> modes,
>>>>>>>>>>> depending on whether an input PCollection or explicit file path is 
>>>>>>>>>>> given
>>>>>>>>>>> (the two being mutually exclusive, with exactly one required, and 
>>>>>>>>>>> good
>>>>>>>>>>> error messaging of course)?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Java:
>>>>>>>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html
>>>>>>>>>>> Python:
>>>>>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText
>>>>>>>>>>> Go:
>>>>>>>>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write
>>>>>>>>>>> Typescript:
>>>>>>>>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html
>>>>>>>>>>> Scio:
>>>>>>>>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to