I'll copy the problems here if your prefer that.
# problem_1.py
# add_columns() resets column names to default names f0, f1, ..., fN
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
table = t_env.from_elements(
elements=[
(1, '{"name": "Flink"}'),
(2, '{"name": "hello"}'),
(3, '{"name": "world"}'),
(4, '{"name": "PyFlink"}')
],
schema=['id', 'data'],
).add_columns(
col('data').json_value('$.name', DataTypes.STRING()).alias('name'),
)
@udf(
result_type=(
'Row<id INT, name STRING>'
),
)
def example_map(row: Row):
print('\n'*3, f'{row=}', '\n'*3)
# will print:
# row=Row(id=1, data='{"name": "Flink"}', f0='Flink')
return Row(row.id, row.name)
# Will raise with
# ValueError: 'name' is not in list
flow = (
table
.map(example_map)
.execute().print()
)
On Fri, Oct 27, 2023 at 2:14 PM Alexey Sergeev <[email protected]> wrote:
>
> Hi everyone,
>
>
> Python Table API seems to be a little bit buggy.
>
> Some minimal examples of strange behaviors here:
>
> https://gist.github.com/nrdhm/88322a68fc3e9a14a5f4ab6ec13403cf
>
>
>
> Was testing in pyflink-shell in our small cluster with Flink 1.17.
>
> Docker image: flink:1.17.1-scala_2.12-java11
>
>
>
> The third problem with pandas UDF concerns me the most.
>
>
>
> It seems like Vectorized UDF do not work at all with .filter() /.where()
> calls.
>
> Columns name are reset to default f0, f1, …, fN and values are not being
> filtered.
>
>
> And so, I have some questions:
>
> 1. Was you able to reproduce these problems?
> 2. Is it the expected behavior?
> 3. How can we get around this?
>
> Best regards, Alexey