# problem_3.py
# call to .where() after .map() with pandas type function
# also resets column names
# and doesn't really filter values
import pandas as pd
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
table = t_env.from_elements(
elements=[
(1, 'China'),
;data').json_value('$.name', DataTypes.STRING()).alias('name'),
)
@udf(
result_type=(
'Row'
),
)
def example_map(row: Row):
print('\n'*3, f'{row=}', '\n'*3)
# will print:
# row=Row(id=1, data='{"
example_map(row: Row):
print('\n'*3, f'{row=}', '\n'*3)
# will print:
# row=Row(f0=1, f1='{"name": "Flink"}')
# expected:
# row=Row(id=1, data='{"name": "Flink"}')
data = json.loads(row.
Hi everyone,
Python Table API seems to be a little bit buggy.
Some minimal examples of strange behaviors here:
https://gist.github.com/nrdhm/88322a68fc3e9a14a5f4ab6ec13403cf
Was testing in pyflink-shell in our small cluster with Flink 1.17.
Docker image: flink:1.17.1-scala_2.12-java11
The