Currently our YAML API supports basic streaming, including setting
windowing for aggregations, but there's no way to retrieve the
windowing/timestamp metadata (short of stepping out of YAML proper and
using Python, Java, etc. DoFn). It would probably be quite useful to have a
more native way of getting this.

One option would be to add a built-in transform to extract this
information, e.g. something like

- type: ReifyWindowingInfo
  config:
    new_field1: timestamp
    new_field2: window
    new_field3: window.end
    ...

The possible values on the RHS of the map would be a fixed list; supporting
things like window.end or pane_info.index would be desirable as their types
are schema-compatible (unlike a raw Window or PaneInfo object). One could
then use this information in downstream transforms.

A second option would be to enhance MapToFields to make this information
available. Currently this transform looks like

- type: MapToFields
  config:
    language: python  # java is also supported, javascript, etc. conceivable
    fields:
      output_field1: input_field + another_input_field
      output_field2:
        callable: |
            def my_inline_function(row):
               row.input_field + another_input_field
        ...

The first case, called the "expression" case, is syntactic sugar that
roughly reifies all[1] input fields as locals and translates to the second.

For the second case, one could treat this similar to the process method of
a DoFn and allow additional annotated arguments (e.g. ParDo.TimestampParam
in Python, @Timestamp annotation for Java). We would detect and propagate
this up to the generated DoFn.

We could consider supporting the "expression" case via some magic variables
(or a special namespace) or require the second form for this capability.

We could, of course, offer both options as well.

Anyone have any opinions or other ideas here?

- Robert



[1] As an optimization we only capture those locals that appear textually
in the body of the expression.

Reply via email to