Another perspective:

We should focus on the fact that FileIO writes what I would call a "big
file-based dataset" to a filesystem. The primary characteristic of a "big
file-based dataset" is that it is sharded and that the shards should not
have any individual distinctiveness. The dataset should be read and written
as a whole, and the shards are an implementation detail for performance.

This impact dynamic destinations in two ways that I can think of right away:

 - It is critical by definition to be able to refer to a whole "big
file-based dataset" as a whole thing. The most obvious way would be for it
to exist as a folder or folder-like grouping of files, and you glob
everything underneath that. But the hard requirement is that there is
*some* way
to refer to the dataset as a single entity (via glob, basically).

 - When using "dynamic destinations" style functionality, each of the
dynamic destinations is a "big file-based dataset". The purpose is to route
a datum to the correct one of these, NOT to route the datum to a particular
file (which would be just some anonymous shard in the dataset).

So having really fine-grained control over filenames is likely to result in
anti-patterns of malformed datasets that cannot be easily globbed or, in
the converse case, well-formed datasets that have suboptimal sharding
because it was manually managed.

I know that "reality" is not this simple, because people have accumulated
files they have to work with as-is, where they probably didn't plan for
this way of thinking when they were gathering the files. We need to give
good options for everyone, but the golden path should be the simple and
good case.

Kenn

On Tue, Oct 10, 2023 at 10:09 AM Kenneth Knowles <k...@apache.org> wrote:

> Since I've been in GHA files lately...
>
> I think they have a very useful pattern which we could borrow from or
> learn from, where setting up the variables happens separately, like
> https://github.com/apache/beam/blob/57821c191d322f9f21c01a34c55e0c40eda44f1e/.github/workflows/build_release_candidate.yml#L270
>
> If we called the section "vars" and then the config could use the vars in
> the destination. I'm making this example deliberately a little gross:
>
>  - vars:
>     - USER_REGION: $.user.metadata.region
>     - USER_GROUP: $.user.groups[0].name
>  - config:
>     - path:
> gs://output-bucket-${vars.USER_REGION}/files/${vars.USER_GROUP}-${fileio.SHARD_NUM}-${fileio.WINDOW}
>
> I think it strikes a good balance between arbitrary lambdas and just a
> prefix/suffix control, giving a really easy place where we can say "the
> whole value of this YAML field is a path expression into the structured
> data"
>
> Kenn
>
> On Mon, Oct 9, 2023 at 6:09 PM Chamikara Jayalath via dev <
> dev@beam.apache.org> wrote:
>
>> I would say:
>>
>>     sink:
>>       type: WriteToParquet
>>       config:
>>         path: /beam/filesytem/dest
>>         prefix: <my prefix>
>>         suffix: <my suffix>
>>
>> Underlying SDK will add the middle part of the file names to make sure
>> that files generated by various bundles/windows/shards do not conflict.
>>
>> This will satisfy the vast majority of use-cases I believe. Fully
>> customizing the file pattern sounds like a more advanced use case that can
>> be left for "real" SDKs.
>>
>> For dynamic destinations, I think just making the "path" component
>> support  a lambda that is parameterized by the input should be adequate
>> since this allows customers to direct files written to different
>> destination directories.
>>
>>     sink:
>>       type: WriteToParquet
>>       config:
>>         path: <destination lambda>
>>         prefix: <my prefix>
>>         suffix: <my suffix>
>>
>> I'm not sure what would be the best way to specify a lambda here though.
>> Maybe a regex or the name of a Python callable ?
>>
>> Thanks,
>> Cham
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Just FYI - the reason why names (including prefixes) in
>>>> DynamicDestinations were parameterized via a lambda instead of just having
>>>> the user add it via MapElements is performance. We discussed something
>>>> along the lines of what you are suggesting (essentially having the user
>>>> create a KV where the key contained the dynamic information). The problem
>>>> was that often the size of the generated filepath was often much larger
>>>> (sometimes by 2 OOM) than the information in the record, and there was a
>>>> desire to avoid record blowup. e.g. the record might contain a single
>>>> integer userid, and the filepath prefix would then be
>>>> /long/path/to/output/users/<id>. This was especially bad in cases where the
>>>> data had to be shuffled, and the existing dynamic destinations method
>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>
>>>
>>> That is a consideration I hadn't thought much of, thanks for
>>> bringing this up.
>>>
>>>
>>>> Now there may not be any good way to keep this benefit in a
>>>> declarative approach such as YAML (or at least a good easy way - we could
>>>> always allow the user to pass in a SQL expression to extract the filename
>>>> from the record!), but we should keep in mind that this might mean that
>>>> YAML-generated pipelines will be less efficient for certain use cases.
>>>>
>>>
>>> Yep, it's not as straightforward to do in a declarative way. I would
>>> like to avoid mixing UDFs (with their associated languages and execution
>>> environments) if possible. Though I'd like the performance of a
>>> "straightforward" YAML pipeline to be that which one can get writing
>>> straight-line Java (and possibly better, if we can leverage the structure
>>> of schemas everywhere) this is not an absolute requirement for all
>>> features.
>>>
>>> I wonder if separating out a constant prefix vs. the dynamic stuff could
>>> be sufficient to mitigate the blow-up of pre-computing this in most cases
>>> (especially in the context of a larger pipeline). Alternatively, rather
>>> than just a sharding pattern, one could have a full filepattern that
>>> includes format parameters for dynamically computed bits as well as the
>>> shard number, windowing info, etc. (There are pros and cons to this.)
>>>
>>>
>>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> Currently the various file writing configurations take a single
>>>>> parameter, path, which indicates where the (sharded) output should be
>>>>> placed. In other words, one can write something like
>>>>>
>>>>>   pipeline:
>>>>>     ...
>>>>>     sink:
>>>>>       type: WriteToParquet
>>>>>       config:
>>>>>         path: /beam/filesytem/dest
>>>>>
>>>>> and one gets files like "/beam/filesystem/dest-X-of-N"
>>>>>
>>>>> Of course, in practice file writing is often much more complicated
>>>>> than this (especially when it comes to Streaming). For reference, I've
>>>>> included links to our existing offerings in the various SDKs below. I'd
>>>>> like to start a discussion about what else should go in the "config"
>>>>> parameter and how it should be expressed in YAML.
>>>>>
>>>>> The primary concern is around naming. This can generally be split into
>>>>> (1) the prefix, which must be provided by the users (2) the sharing
>>>>> information, includes both shard counts (e.g. (the -X-of-N suffix) but 
>>>>> also
>>>>> windowing information (for streaming pipelines) which we may want to allow
>>>>> the user to customize the formatting of, and (3) a suffix like .json or
>>>>> .avro that is useful for both humans and tooling and can often be inferred
>>>>> but should allow customization as well.
>>>>>
>>>>> An interesting case is that of dynamic destinations, where the prefix
>>>>> (or other parameters) may themselves be functions of the records
>>>>> themselves. (I am excluding the case where the format itself is
>>>>> variable--such cases are probably better handled by explicitly 
>>>>> partitioning
>>>>> the data and doing multiple writes, as this introduces significant
>>>>> complexities and the set of possible formats is generally finite and known
>>>>> ahead of time.) I propose that we leverage the fact that we have 
>>>>> structured
>>>>> data to be able to pull out these dynamic parameters. For example, if we
>>>>> have an input data set with a string column my_col we could allow 
>>>>> something
>>>>> like
>>>>>
>>>>>   config:
>>>>>     path: {dynamic: my_col}
>>>>>
>>>>> which would pull this information out at runtime. (With the
>>>>> MapToFields transform, it is very easy to compute/append additional fields
>>>>> to existing records.) Generally this field would then be stripped from the
>>>>> written data, which would only see the subset of non-dynamically 
>>>>> referenced
>>>>> columns (though this could be configurable: we could add an attribute like
>>>>> {dynamic: my_col, Keep: true} or require the set of columns to be actually
>>>>> written (or elided) to be enumerated in the config or allow/require the
>>>>> actual data to be written to be in a designated field of the "full" input
>>>>> records as arranged by a preceding transform). It'd be great to get
>>>>> input/impressions from a wide range of people here on what would be the
>>>>> most natural. Often just writing out snippets of various alternatives can
>>>>> be quite informative (though I'm avoiding putting them here for the moment
>>>>> to avoid biasing ideas right off the bat).
>>>>>
>>>>> For streaming pipelines it is often essential to write data out in a
>>>>> time-partitioned manner. The typical way to do this is to add the 
>>>>> windowing
>>>>> information into the shard specification itself, and a (set of) file(s) is
>>>>> written on each window closing. Beam YAML already supports any transform
>>>>> being given a "windowing" configuration which will cause a WindowInto
>>>>> transform to be applied to its input(s) before application which can sit
>>>>> naturally on a sink. We may want to consider if non-windowed writes make
>>>>> sense as well (though how this interacts with the watermark and underlying
>>>>> implementations are a large open question, so this is a larger change that
>>>>> might make sense to defer).
>>>>>
>>>>> Note that I am explicitly excluding "coders" here. All data in YAML
>>>>> should be schema'd, and writers should know how to write this structured
>>>>> data. We may want to allow a "schema" field to allow a user to specify the
>>>>> desired schema in a manner compatible with the sink format itself (e.g.
>>>>> avro, json, whatever) that could be used both for validation and possibly
>>>>> resolving ambiguities (e.g. if the sink has an enum format that is not
>>>>> expressed in the schema of the input PCollection).
>>>>>
>>>>> Some other configuration options are that some formats (especially
>>>>> text-based ones) allow for specification of an external compression type
>>>>> (which may be inferable from the suffix), whether to write a single shard
>>>>> if the input collection is empty or no shards at all (an occasional user
>>>>> request that's supported for some Beam sinks now), whether to allow fixed
>>>>> sharing (generally discouraged, as it disables things like automatic
>>>>> shading based on input size, let alone dynamic work rebalancing, though
>>>>> sometimes this is useful if the input is known to be small and a single
>>>>> output is desired regardless of the restriction in parallelism), or other
>>>>> sharding parameters (e.g. limiting the number of total elements or
>>>>> (approximately) total number of bytes per output shard). Some of these
>>>>> options may not be available/implemented for all formats--consideration
>>>>> should be given as to how to handle this inconsistency (runtime errors for
>>>>> unsupported combinations or simply not allowing them on any until all are
>>>>> supported).
>>>>>
>>>>> A final consideration: we do not anticipate exposing the full
>>>>> complexity of Beam in the YAML offering. For advanced users using a "real"
>>>>> SDK will often be preferable, and we intend to provide a migration path
>>>>> from YAML to a language of your choice (codegen) as a migration path. So 
>>>>> we
>>>>> should balance simplicity with completeness and utility here.
>>>>>
>>>>> Sure, we could just pick something, but given that the main point of
>>>>> YAML is not capability, but expressibility and ease-of-use, I think it's
>>>>> worth trying to get the expression of these concepts right. I'm sure many
>>>>> of you have written a pipeline to files at some point in time; I'd welcome
>>>>> any thoughts anyone has on the matter.
>>>>>
>>>>> - Robert
>>>>>
>>>>>
>>>>> P.S. A related consideration: how should we consider the plain Read
>>>>> (where that file pattern is given at pipeline construction) from the
>>>>> ReadAll variants? Should they be separate transforms, or should we instead
>>>>> allow the same named transform (e.g. ReadFromParquet) support both modes,
>>>>> depending on whether an input PCollection or explicit file path is given
>>>>> (the two being mutually exclusive, with exactly one required, and good
>>>>> error messaging of course)?
>>>>>
>>>>>
>>>>> Java:
>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html
>>>>> Python:
>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText
>>>>> Go:
>>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write
>>>>> Typescript:
>>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html
>>>>> Scio:
>>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html
>>>>>
>>>>>

Reply via email to