# problem_2.py # .alias() does not work either
import json t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) table = t_env.from_elements( elements=[ (1, '{"name": "Flink"}'), (2, '{"name": "hello"}'), (3, '{"name": "world"}'), (4, '{"name": "PyFlink"}') ], schema=['id', 'data'], ).alias('id', 'data') @udf( result_type=( 'Row<id INT, name STRING>' ), ) def example_map(row: Row): print('\n'*3, f'{row=}', '\n'*3) # will print: # row=Row(f0=1, f1='{"name": "Flink"}') # expected: # row=Row(id=1, data='{"name": "Flink"}') data = json.loads(row.data) return Row(row.id, data['name']) # Will raise with # ValueError: 'data' is not in list flow = ( table .map(example_map) .execute().print() ) On Fri, Oct 27, 2023 at 2:14 PM Alexey Sergeev <alexey.nr...@gmail.com> wrote: > > Hi everyone, > > > Python Table API seems to be a little bit buggy. > > Some minimal examples of strange behaviors here: > > https://gist.github.com/nrdhm/88322a68fc3e9a14a5f4ab6ec13403cf > > > > Was testing in pyflink-shell in our small cluster with Flink 1.17. > > Docker image: flink:1.17.1-scala_2.12-java11 > > > > The third problem with pandas UDF concerns me the most. > > > > It seems like Vectorized UDF do not work at all with .filter() /.where() > calls. > > Columns name are reset to default f0, f1, …, fN and values are not being > filtered. > > > And so, I have some questions: > > 1. Was you able to reproduce these problems? > 2. Is it the expected behavior? > 3. How can we get around this? > > Best regards, Alexey