I'll copy the problems here if your prefer that.

# problem_1.py

# add_columns() resets column names to default names f0, f1, ..., fN

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

table = t_env.from_elements(
    elements=[
        (1, '{"name": "Flink"}'),
        (2, '{"name": "hello"}'),
        (3, '{"name": "world"}'),
        (4, '{"name": "PyFlink"}')
    ],
    schema=['id', 'data'],
).add_columns(
    col('data').json_value('$.name', DataTypes.STRING()).alias('name'),
)

@udf(
    result_type=(
        'Row<id INT, name STRING>'
    ),
)
def example_map(row: Row):
    print('\n'*3, f'{row=}', '\n'*3)
    # will print:
    # row=Row(id=1, data='{"name": "Flink"}', f0='Flink')
    return Row(row.id, row.name)

# Will raise with
# ValueError: 'name' is not in list

flow = (
    table
    .map(example_map)
    .execute().print()
)


On Fri, Oct 27, 2023 at 2:14 PM Alexey Sergeev <alexey.nr...@gmail.com> wrote:
>
> Hi everyone,
>
>
> Python Table API seems to be a little bit buggy.
>
> Some minimal examples of strange behaviors here:
>
> https://gist.github.com/nrdhm/88322a68fc3e9a14a5f4ab6ec13403cf
>
>
>
> Was testing in pyflink-shell in our small cluster with Flink 1.17.
>
> Docker image: flink:1.17.1-scala_2.12-java11
>
>
>
> The third problem with pandas UDF concerns me the most.
>
>
>
> It seems like Vectorized UDF do not work at all with .filter() /.where() 
> calls.
>
> Columns name are reset to default f0, f1, …, fN and values are not being 
> filtered.
>
>
> And so, I have some questions:
>
>   1. Was you able to reproduce these problems?
>   2. Is it the expected behavior?
>   3. How can we get around this?
>
> Best regards, Alexey

Reply via email to