Hi Manas, It seems like a bug. You can try to replace the udtf sql call with such code as a workaround currently:
t_env.register_table("tmp_view", t_env.from_path(f"{INPUT_TABLE}").join_lateral("split(data) as (featureName, featureValue)")) This works for me. I’ll try to find out what caused this exception. Best, Wei > 在 2020年7月28日,18:33,Manas Kale <manaskal...@gmail.com> 写道: > > Hi, > Using pyFlink DDL, I am trying to: > Consume a Kafka JSON stream. This has messages with aggregate data, example: > "data": > "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}" > I am splitting field "data" so that I can process its values individually. > For that, I have defined a UDTF. > I store the UDTF output in a temporary view. (Meaning each output of the UDTF > will contain "0001" 105.0, "0002" 1.21 etc...) > I use the values in this temporary view to calculate some aggregation metrics. > I am getting an SQL error for step 4. > Code: > from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic > from pyflink.table import StreamTableEnvironment, EnvironmentSettings, > DataTypes, Row > from pyflink.table.udf import udtf > from json import loads > exec_env = StreamExecutionEnvironment.get_execution_environment() > exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > t_env = StreamTableEnvironment.create(exec_env, > environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) > > > @udtf(input_types=DataTypes.STRING(), result_types= > [DataTypes.STRING(),DataTypes.DOUBLE()]) > def split_feature_values(data_string): > json_data = loads(data_string) > for f_name, f_value in json_data.items(): > yield f_name, f_value > > # configure the off-heap memory of current taskmanager to enable the python > worker uses off-heap memory. > t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", > '80m') > > # Register UDTF > t_env.register_function("split", split_feature_values) > # ... string constants.... > > # Init Kafka input table > t_env.execute_sql(f""" > CREATE TABLE {INPUT_TABLE} ( > monitorId STRING, > deviceId STRING, > state INT, > data STRING, > time_str TIMESTAMP(3), > WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' > SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = '{INPUT_TOPIC}', > 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', > 'format' = 'json' > ) > """) > > # 10 sec summary table > t_env.execute_sql(f""" > CREATE TABLE {TEN_SEC_OUTPUT_TABLE} ( > monitorId STRING, > featureName STRING, > maxFv DOUBLE, > minFv DOUBLE, > avgFv DOUBLE, > windowStart TIMESTAMP(3), > WATERMARK FOR windowStart AS windowStart > ) WITH ( > 'connector' = 'kafka', > 'topic' = '{TEN_SEC_OUTPUT_TOPIC}', > 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', > 'format' = 'json' > ) > """) > > # Join with UDTF > t_env.execute_sql(f""" > CREATE VIEW tmp_view AS > SELECT * FROM ( > SELECT monitorId, T.featureName, T.featureValue, time_str > FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, > featureValue) > ) > """) > > # Create 10 second view <--------------------- this causes the error > t_env.execute_sql(f""" > INSERT INTO {TEN_SEC_OUTPUT_TABLE} > SELECT monitorId, featureName, MAX(featureValue), MIN(featureValue), > AVG(featureValue), TUMBLE_START(time_str, INTERVAL '10' SECOND) > FROM tmp_view > GROUP BY TUMBLE(time_str, INTERVAL '10' SECOND), monitorId, featureName > """) > > The last SQL statement where I calculate metrics causes the error. The error > message is : > Traceback (most recent call last): > File > "/home/manas/IU_workspace/Flink_POC/pyflink/aggregate_streaming_job.py", line > 97, in <module> > """) > File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table_environment.py", > line 543, in execute_sql > return TableResult(self._j_tenv.executeSql(stmt)) > File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name > <http://self.name/>) > File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", > line 147, in deco > return f(*a, **kw) > File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", > line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql. > : org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 4, column 15 to line 4, column 20: Column 'data' not found in any table > > I don't understand why Flink wants a "data" column. I discard the "data" > column in the temporary view, and it certainly does not exist in the > TEN_SECOND_OUTPUT_TABLE. The only place it exists is in the initial > INPUT_TABLE which is not relevant for the erroneous SQL statement! > Clearly I missed understanding something. Have I missed something when > creating the temporary view? > > >