On Tue, Oct 22, 2024 at 11:46 AM Danny McCormick
<dannymccorm...@google.com> wrote:
>
> > (1a) Provide a special operation "Unnest" that takes a single field
> > and emits it as the top-level element. This can of course result in
> > unschema'd PCollections (which are supported, but generally don't play
> > as well with the other operations, including xlang ones).
>
> I like this the most out of the options - why does it have to be unschema'd 
> though? Couldn't we retain that information from previous steps? If not, I 
> don't see a way around losing schema info.

Yes, if the unnested element itself is schema'd, that is preserved. If
it's, say, an int, it will be a bare PCollection of ints. (Which isn't
the end of the world...)

Naming is also still TBD. I just realized that unnest has the meaning
of iteration/flatten in some SQL dialects. For our dynamic
destinations we chose the keyword "only" to indicate that we want to
only write a specified field (as a top level record) rather than the
entire record.

> On Tue, Oct 22, 2024 at 1:50 PM Robert Bradshaw via dev <dev@beam.apache.org> 
> wrote:
>>
>> On Sat, Oct 19, 2024 at 8:01 AM XQ Hu via dev <dev@beam.apache.org> wrote:
>> >
>> > I probably missed something. Tried this toy example:
>> >
>> > pipeline:
>> >   transforms:
>> >     - type: Create
>> >       config:
>> >         elements: [1, 2, 3, 4, -1]
>> >     - type: MapToFields
>> >       input: Create
>> >       name: MapToFields_1
>> >       config:
>> >         language: python
>> >         fields:
>> >           element:
>> >             callable: |
>> >               import math
>> >               def process_num(row):
>> >                 return math.sqrt(row.element)
>> >         error_handling:
>> >           output: my_error_output
>> >     - type: LogForTesting
>> >       input: MapToFields_1
>> >     - type: MapToFields
>> >       input: MapToFields_1.my_error_output
>> >       name: MapToFields_2
>> >       config:
>> >         language: python
>> >         fields:
>> >           element:
>> >             callable: |
>> >               # return the raw element
>> >               def process_error_row(row):
>> >                 return row.element[0]
>> >     - type: LogForTesting
>> >       input: MapToFields_2
>> >
>> > It looks like MapToFields is good enough to get any information returned 
>> > by error_handling.
>>
>> Yes, it's possible, but your MapToFields_2 needs to know (and
>> reproduce) the structure of the original element to reconstruct it.
>>
>> > On Sat, Oct 19, 2024 at 2:55 AM Ahmed Abualsaud via dev 
>> > <dev@beam.apache.org> wrote:
>> >>
>> >> Another option is to add a second DLQ that outputs just the original 
>> >> rows, i.e. the user has the option to fetch failed rows with or without 
>> >> metadata.
>> >> It would take some work on our side to add this second DLQ to existing 
>> >> transforms, but that seems pretty straightforward.
>>
>> Yeah. I would prefer to do it in such a way that one didn't have to
>> modify all existing (and future) transforms. Another downside is that
>> having two error outputs doesn't play as nicely with error handlers
>> (https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.html
>> ).
>>
>> Yet another option would be to add a yaml StripErrorMetadata
>> transform, as this is the place where it's not convenient to just do a
>> map.
>>
>> >> On Sat, Oct 19, 2024 at 1:03 AM Robert Bradshaw via dev 
>> >> <dev@beam.apache.org> wrote:
>> >>>
>> >>> I came across an interesting user report at
>> >>> https://github.com/apache/beam/issues/32866 which made me realize that
>> >>> providing metadata about a bad element in the "bad records" output is
>> >>> useful, we don't make it easy to extract the output into a PCollection
>> >>> of the original elements. The output schema contains the original
>> >>> element as well as metadata about what error occurred, and in an
>> >>> ordinary Beam pipeline one could easily apply a Map(lambda error_row:
>> >>> error_row.element) but YAML doesn't have Map, just MapToFields
>> >>> (primarily to be more schema friendly).
>> >>>
>> >>> There are a couple of options:
>> >>>
>> >>> (0) Leave things as they are. One can write
>> >>>
>> >>> type: MapToFields
>> >>> config:
>> >>>   fields:
>> >>>     fld1: element.fld1
>> >>>     fld2: element.fld2
>> >>>     ...
>> >>>
>> >>>
>> >>> This is of course a bit ugly as one needs to enumerate (and know) the
>> >>> set of original fields.
>> >>>
>> >>> (1a) Provide a special operation "Unnest" that takes a single field
>> >>> and emits it as the top-level element. This can of course result in
>> >>> unschema'd PCollections (which are supported, but generally don't play
>> >>> as well with the other operations, including xlang ones).
>> >>>
>> >>> (1b) Just provide a Map. This is a generalization of 1a, but on the
>> >>> other hand would be more prone to abuse.
>> >>>
>> >>> (1c) We could name this
>> >>>
>> >>> type: MapToFields
>> >>> config:
>> >>>   fields:
>> >>>     *: element
>> >>>
>> >>> IIRC, we already have the special case of "*" in our join syntax, and
>> >>> we could re-use a bunch of the MapToFields infrastructure. But maybe
>> >>> it's too obscure?
>> >>>
>> >>> (2) Add an optional argument to error_handling to omit the metadata.
>> >>> This would require a bit of a hack to support ubiquitously, and
>> >>> wouldn't solve the more general problem.
>> >>>
>> >>> Maybe there are some other ideas as well?

Reply via email to