Just FYI - the reason why names (including prefixes) in DynamicDestinations
were parameterized via a lambda instead of just having the user add it via
MapElements is performance. We discussed something along the lines of what
you are suggesting (essentially having the user create a KV where the key
contained the dynamic information). The problem was that often the size of
the generated filepath was often much larger (sometimes by 2 OOM) than the
information in the record, and there was a desire to avoid record blowup.
e.g. the record might contain a single integer userid, and the filepath
prefix would then be /long/path/to/output/users/<id>. This was especially
bad in cases where the data had to be shuffled, and the existing dynamic
destinations method allowed extracting the filepath only _after_  the
shuffle.

Now there may not be any good way to keep this benefit in a
declarative approach such as YAML (or at least a good easy way - we could
always allow the user to pass in a SQL expression to extract the filename
from the record!), but we should keep in mind that this might mean that
YAML-generated pipelines will be less efficient for certain use cases.

On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <dev@beam.apache.org>
wrote:

> Currently the various file writing configurations take a single parameter,
> path, which indicates where the (sharded) output should be placed. In other
> words, one can write something like
>
>   pipeline:
>     ...
>     sink:
>       type: WriteToParquet
>       config:
>         path: /beam/filesytem/dest
>
> and one gets files like "/beam/filesystem/dest-X-of-N"
>
> Of course, in practice file writing is often much more complicated than
> this (especially when it comes to Streaming). For reference, I've included
> links to our existing offerings in the various SDKs below. I'd like to
> start a discussion about what else should go in the "config" parameter and
> how it should be expressed in YAML.
>
> The primary concern is around naming. This can generally be split into (1)
> the prefix, which must be provided by the users (2) the sharing
> information, includes both shard counts (e.g. (the -X-of-N suffix) but also
> windowing information (for streaming pipelines) which we may want to allow
> the user to customize the formatting of, and (3) a suffix like .json or
> .avro that is useful for both humans and tooling and can often be inferred
> but should allow customization as well.
>
> An interesting case is that of dynamic destinations, where the prefix (or
> other parameters) may themselves be functions of the records themselves. (I
> am excluding the case where the format itself is variable--such cases are
> probably better handled by explicitly partitioning the data and doing
> multiple writes, as this introduces significant complexities and the set of
> possible formats is generally finite and known ahead of time.) I propose
> that we leverage the fact that we have structured data to be able to pull
> out these dynamic parameters. For example, if we have an input data set
> with a string column my_col we could allow something like
>
>   config:
>     path: {dynamic: my_col}
>
> which would pull this information out at runtime. (With the MapToFields
> transform, it is very easy to compute/append additional fields to existing
> records.) Generally this field would then be stripped from the written
> data, which would only see the subset of non-dynamically referenced columns
> (though this could be configurable: we could add an attribute like
> {dynamic: my_col, Keep: true} or require the set of columns to be actually
> written (or elided) to be enumerated in the config or allow/require the
> actual data to be written to be in a designated field of the "full" input
> records as arranged by a preceding transform). It'd be great to get
> input/impressions from a wide range of people here on what would be the
> most natural. Often just writing out snippets of various alternatives can
> be quite informative (though I'm avoiding putting them here for the moment
> to avoid biasing ideas right off the bat).
>
> For streaming pipelines it is often essential to write data out in a
> time-partitioned manner. The typical way to do this is to add the windowing
> information into the shard specification itself, and a (set of) file(s) is
> written on each window closing. Beam YAML already supports any transform
> being given a "windowing" configuration which will cause a WindowInto
> transform to be applied to its input(s) before application which can sit
> naturally on a sink. We may want to consider if non-windowed writes make
> sense as well (though how this interacts with the watermark and underlying
> implementations are a large open question, so this is a larger change that
> might make sense to defer).
>
> Note that I am explicitly excluding "coders" here. All data in YAML should
> be schema'd, and writers should know how to write this structured data. We
> may want to allow a "schema" field to allow a user to specify the desired
> schema in a manner compatible with the sink format itself (e.g. avro, json,
> whatever) that could be used both for validation and possibly resolving
> ambiguities (e.g. if the sink has an enum format that is not expressed in
> the schema of the input PCollection).
>
> Some other configuration options are that some formats (especially
> text-based ones) allow for specification of an external compression type
> (which may be inferable from the suffix), whether to write a single shard
> if the input collection is empty or no shards at all (an occasional user
> request that's supported for some Beam sinks now), whether to allow fixed
> sharing (generally discouraged, as it disables things like automatic
> shading based on input size, let alone dynamic work rebalancing, though
> sometimes this is useful if the input is known to be small and a single
> output is desired regardless of the restriction in parallelism), or other
> sharding parameters (e.g. limiting the number of total elements or
> (approximately) total number of bytes per output shard). Some of these
> options may not be available/implemented for all formats--consideration
> should be given as to how to handle this inconsistency (runtime errors for
> unsupported combinations or simply not allowing them on any until all are
> supported).
>
> A final consideration: we do not anticipate exposing the full complexity
> of Beam in the YAML offering. For advanced users using a "real" SDK will
> often be preferable, and we intend to provide a migration path from YAML to
> a language of your choice (codegen) as a migration path. So we should
> balance simplicity with completeness and utility here.
>
> Sure, we could just pick something, but given that the main point of YAML
> is not capability, but expressibility and ease-of-use, I think it's worth
> trying to get the expression of these concepts right. I'm sure many of you
> have written a pipeline to files at some point in time; I'd welcome any
> thoughts anyone has on the matter.
>
> - Robert
>
>
> P.S. A related consideration: how should we consider the plain Read (where
> that file pattern is given at pipeline construction) from the ReadAll
> variants? Should they be separate transforms, or should we instead allow
> the same named transform (e.g. ReadFromParquet) support both modes,
> depending on whether an input PCollection or explicit file path is given
> (the two being mutually exclusive, with exactly one required, and good
> error messaging of course)?
>
>
> Java:
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html
> Python:
> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText
> Go:
> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write
> Typescript:
> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html
> Scio:
> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html
>
>

Reply via email to