I would say:

    sink:
      type: WriteToParquet
      config:
        path: /beam/filesytem/dest
        prefix: <my prefix>
        suffix: <my suffix>

Underlying SDK will add the middle part of the file names to make sure that
files generated by various bundles/windows/shards do not conflict.

This will satisfy the vast majority of use-cases I believe. Fully
customizing the file pattern sounds like a more advanced use case that can
be left for "real" SDKs.

For dynamic destinations, I think just making the "path" component support
a lambda that is parameterized by the input should be adequate since this
allows customers to direct files written to different destination
directories.

    sink:
      type: WriteToParquet
      config:
        path: <destination lambda>
        prefix: <my prefix>
        suffix: <my suffix>

I'm not sure what would be the best way to specify a lambda here though.
Maybe a regex or the name of a Python callable ?

Thanks,
Cham










On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <dev@beam.apache.org>
wrote:

> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com> wrote:
>
>> Just FYI - the reason why names (including prefixes) in
>> DynamicDestinations were parameterized via a lambda instead of just having
>> the user add it via MapElements is performance. We discussed something
>> along the lines of what you are suggesting (essentially having the user
>> create a KV where the key contained the dynamic information). The problem
>> was that often the size of the generated filepath was often much larger
>> (sometimes by 2 OOM) than the information in the record, and there was a
>> desire to avoid record blowup. e.g. the record might contain a single
>> integer userid, and the filepath prefix would then be
>> /long/path/to/output/users/<id>. This was especially bad in cases where the
>> data had to be shuffled, and the existing dynamic destinations method
>> allowed extracting the filepath only _after_  the shuffle.
>>
>
> That is a consideration I hadn't thought much of, thanks for bringing this
> up.
>
>
>> Now there may not be any good way to keep this benefit in a
>> declarative approach such as YAML (or at least a good easy way - we could
>> always allow the user to pass in a SQL expression to extract the filename
>> from the record!), but we should keep in mind that this might mean that
>> YAML-generated pipelines will be less efficient for certain use cases.
>>
>
> Yep, it's not as straightforward to do in a declarative way. I would like
> to avoid mixing UDFs (with their associated languages and execution
> environments) if possible. Though I'd like the performance of a
> "straightforward" YAML pipeline to be that which one can get writing
> straight-line Java (and possibly better, if we can leverage the structure
> of schemas everywhere) this is not an absolute requirement for all
> features.
>
> I wonder if separating out a constant prefix vs. the dynamic stuff could
> be sufficient to mitigate the blow-up of pre-computing this in most cases
> (especially in the context of a larger pipeline). Alternatively, rather
> than just a sharding pattern, one could have a full filepattern that
> includes format parameters for dynamically computed bits as well as the
> shard number, windowing info, etc. (There are pros and cons to this.)
>
>
>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Currently the various file writing configurations take a single
>>> parameter, path, which indicates where the (sharded) output should be
>>> placed. In other words, one can write something like
>>>
>>>   pipeline:
>>>     ...
>>>     sink:
>>>       type: WriteToParquet
>>>       config:
>>>         path: /beam/filesytem/dest
>>>
>>> and one gets files like "/beam/filesystem/dest-X-of-N"
>>>
>>> Of course, in practice file writing is often much more complicated than
>>> this (especially when it comes to Streaming). For reference, I've included
>>> links to our existing offerings in the various SDKs below. I'd like to
>>> start a discussion about what else should go in the "config" parameter and
>>> how it should be expressed in YAML.
>>>
>>> The primary concern is around naming. This can generally be split into
>>> (1) the prefix, which must be provided by the users (2) the sharing
>>> information, includes both shard counts (e.g. (the -X-of-N suffix) but also
>>> windowing information (for streaming pipelines) which we may want to allow
>>> the user to customize the formatting of, and (3) a suffix like .json or
>>> .avro that is useful for both humans and tooling and can often be inferred
>>> but should allow customization as well.
>>>
>>> An interesting case is that of dynamic destinations, where the prefix
>>> (or other parameters) may themselves be functions of the records
>>> themselves. (I am excluding the case where the format itself is
>>> variable--such cases are probably better handled by explicitly partitioning
>>> the data and doing multiple writes, as this introduces significant
>>> complexities and the set of possible formats is generally finite and known
>>> ahead of time.) I propose that we leverage the fact that we have structured
>>> data to be able to pull out these dynamic parameters. For example, if we
>>> have an input data set with a string column my_col we could allow something
>>> like
>>>
>>>   config:
>>>     path: {dynamic: my_col}
>>>
>>> which would pull this information out at runtime. (With the MapToFields
>>> transform, it is very easy to compute/append additional fields to existing
>>> records.) Generally this field would then be stripped from the written
>>> data, which would only see the subset of non-dynamically referenced columns
>>> (though this could be configurable: we could add an attribute like
>>> {dynamic: my_col, Keep: true} or require the set of columns to be actually
>>> written (or elided) to be enumerated in the config or allow/require the
>>> actual data to be written to be in a designated field of the "full" input
>>> records as arranged by a preceding transform). It'd be great to get
>>> input/impressions from a wide range of people here on what would be the
>>> most natural. Often just writing out snippets of various alternatives can
>>> be quite informative (though I'm avoiding putting them here for the moment
>>> to avoid biasing ideas right off the bat).
>>>
>>> For streaming pipelines it is often essential to write data out in a
>>> time-partitioned manner. The typical way to do this is to add the windowing
>>> information into the shard specification itself, and a (set of) file(s) is
>>> written on each window closing. Beam YAML already supports any transform
>>> being given a "windowing" configuration which will cause a WindowInto
>>> transform to be applied to its input(s) before application which can sit
>>> naturally on a sink. We may want to consider if non-windowed writes make
>>> sense as well (though how this interacts with the watermark and underlying
>>> implementations are a large open question, so this is a larger change that
>>> might make sense to defer).
>>>
>>> Note that I am explicitly excluding "coders" here. All data in YAML
>>> should be schema'd, and writers should know how to write this structured
>>> data. We may want to allow a "schema" field to allow a user to specify the
>>> desired schema in a manner compatible with the sink format itself (e.g.
>>> avro, json, whatever) that could be used both for validation and possibly
>>> resolving ambiguities (e.g. if the sink has an enum format that is not
>>> expressed in the schema of the input PCollection).
>>>
>>> Some other configuration options are that some formats (especially
>>> text-based ones) allow for specification of an external compression type
>>> (which may be inferable from the suffix), whether to write a single shard
>>> if the input collection is empty or no shards at all (an occasional user
>>> request that's supported for some Beam sinks now), whether to allow fixed
>>> sharing (generally discouraged, as it disables things like automatic
>>> shading based on input size, let alone dynamic work rebalancing, though
>>> sometimes this is useful if the input is known to be small and a single
>>> output is desired regardless of the restriction in parallelism), or other
>>> sharding parameters (e.g. limiting the number of total elements or
>>> (approximately) total number of bytes per output shard). Some of these
>>> options may not be available/implemented for all formats--consideration
>>> should be given as to how to handle this inconsistency (runtime errors for
>>> unsupported combinations or simply not allowing them on any until all are
>>> supported).
>>>
>>> A final consideration: we do not anticipate exposing the full complexity
>>> of Beam in the YAML offering. For advanced users using a "real" SDK will
>>> often be preferable, and we intend to provide a migration path from YAML to
>>> a language of your choice (codegen) as a migration path. So we should
>>> balance simplicity with completeness and utility here.
>>>
>>> Sure, we could just pick something, but given that the main point of
>>> YAML is not capability, but expressibility and ease-of-use, I think it's
>>> worth trying to get the expression of these concepts right. I'm sure many
>>> of you have written a pipeline to files at some point in time; I'd welcome
>>> any thoughts anyone has on the matter.
>>>
>>> - Robert
>>>
>>>
>>> P.S. A related consideration: how should we consider the plain Read
>>> (where that file pattern is given at pipeline construction) from the
>>> ReadAll variants? Should they be separate transforms, or should we instead
>>> allow the same named transform (e.g. ReadFromParquet) support both modes,
>>> depending on whether an input PCollection or explicit file path is given
>>> (the two being mutually exclusive, with exactly one required, and good
>>> error messaging of course)?
>>>
>>>
>>> Java:
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html
>>> Python:
>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText
>>> Go:
>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write
>>> Typescript:
>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html
>>> Scio:
>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html
>>>
>>>

Reply via email to