I probably missed something. Tried this toy example:

pipeline:
  transforms:
    - type: Create
      config:
        elements: [1, 2, 3, 4, -1]
    - type: MapToFields
      input: Create
      name: MapToFields_1
      config:
        language: python
        fields:
          element:
            callable: |
              import math
              def process_num(row):
                return math.sqrt(row.element)
        error_handling:
          output: my_error_output
    - type: LogForTesting
      input: MapToFields_1
    - type: MapToFields
      input: MapToFields_1.my_error_output
      name: MapToFields_2
      config:
        language: python
        fields:
          element:
            callable: |
              # return the raw element
              def process_error_row(row):
                return row.element[0]
    - type: LogForTesting
      input: MapToFields_2

It looks like MapToFields is good enough to get any information returned by
error_handling.

On Sat, Oct 19, 2024 at 2:55 AM Ahmed Abualsaud via dev <dev@beam.apache.org>
wrote:

> Another option is to add a second DLQ that outputs just the original rows,
> i.e. the user has the option to fetch failed rows with or without metadata.
> It would take some work on our side to add this second DLQ to
> existing transforms, but that seems pretty straightforward.
>
> On Sat, Oct 19, 2024 at 1:03 AM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> I came across an interesting user report at
>> https://github.com/apache/beam/issues/32866 which made me realize that
>> providing metadata about a bad element in the "bad records" output is
>> useful, we don't make it easy to extract the output into a PCollection
>> of the original elements. The output schema contains the original
>> element as well as metadata about what error occurred, and in an
>> ordinary Beam pipeline one could easily apply a Map(lambda error_row:
>> error_row.element) but YAML doesn't have Map, just MapToFields
>> (primarily to be more schema friendly).
>>
>> There are a couple of options:
>>
>> (0) Leave things as they are. One can write
>>
>> type: MapToFields
>> config:
>>   fields:
>>     fld1: element.fld1
>>     fld2: element.fld2
>>     ...
>>
>>
>> This is of course a bit ugly as one needs to enumerate (and know) the
>> set of original fields.
>>
>> (1a) Provide a special operation "Unnest" that takes a single field
>> and emits it as the top-level element. This can of course result in
>> unschema'd PCollections (which are supported, but generally don't play
>> as well with the other operations, including xlang ones).
>>
>> (1b) Just provide a Map. This is a generalization of 1a, but on the
>> other hand would be more prone to abuse.
>>
>> (1c) We could name this
>>
>> type: MapToFields
>> config:
>>   fields:
>>     *: element
>>
>> IIRC, we already have the special case of "*" in our join syntax, and
>> we could re-use a bunch of the MapToFields infrastructure. But maybe
>> it's too obscure?
>>
>> (2) Add an optional argument to error_handling to omit the metadata.
>> This would require a bit of a hack to support ubiquitously, and
>> wouldn't solve the more general problem.
>>
>> Maybe there are some other ideas as well?
>>
>

Reply via email to