Hi Sumeet,

Due to the limitation of the original PyFlink serializers design, there is
no way to pass attribute names to Row in row-based operations. In
release-1.14, I am reconstructing the implementations of serializers[1].
After completion, accessing attribute names of `Row` in row-based
operations will be supported[2].

About the work around way in releases-1.13, maybe you need to manually set
the field_names of Row. e.g.
```
def my_table_tranform_fn(x: Row):
    x.set_field_names(['a', 'b', 'c'])
    ...
```

[1] https://issues.apache.org/jira/browse/FLINK-22612
[2] https://issues.apache.org/jira/browse/FLINK-22712

Best,
Xingbo

Sumeet Malhotra <sumeet.malho...@gmail.com> 于2021年5月19日周三 下午4:45写道:

> Hi,
>
> According to the documentation for PyFlink Table row based operations [1],
> typical usage is as follows:
>
> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
> def split(x: Row) -> Row:
>     for s in x[1].split(","):
>         yield x[0], s
>
> table.flat_map(split)
>
> Is there any way that row fields inside the UDTF can be accessed by
> their attribute names instead of array index? In my use case, I'm doing the
> following:
>
> raw_data = t_env.from_path('MySource')
> raw_data \
>     .join_lateral(my_table_tranform_fn(raw_data).alias('a', 'b', 'c') \
>     .flat_map(my_flat_map_fn).alias('x', 'y', 'z') \
>     .execute_insert("MySink")
>
> In the table function `my_flat_map_fn` I'm unable to access the fields of
> the row by their attribute names i.e., assuming the input argument to the
> table function is x, I cannot access fields as x.a, x.b or x.c, instead I
> have use use x[0], x[1] and x[2]. The error I get is the _fields is not
> populated.
>
> In my use case, the number of columns is very high and working with
> indexes is so much error prone and unmaintainable.
>
> Any suggestions?
>
> Thanks,
> Sumeet
>
>

Reply via email to