Hi Manas, It seems a bug of the create view operation. I have created a JIRA for it: https://issues.apache.org/jira/browse/FLINK-18750 <https://issues.apache.org/jira/browse/FLINK-18750>
Before repairing, please do not use create view operation for udtf call. Best, Wei > 在 2020年7月28日,21:19,Wei Zhong <weizhong0...@gmail.com> 写道: > > Hi Manas, > > It seems like a bug. You can try to replace the udtf sql call with such code > as a workaround currently: > > t_env.register_table("tmp_view", > t_env.from_path(f"{INPUT_TABLE}").join_lateral("split(data) as (featureName, > featureValue)")) > > This works for me. I’ll try to find out what caused this exception. > > Best, > Wei > >> 在 2020年7月28日,18:33,Manas Kale <manaskal...@gmail.com >> <mailto:manaskal...@gmail.com>> 写道: >> >> Hi, >> Using pyFlink DDL, I am trying to: >> Consume a Kafka JSON stream. This has messages with aggregate data, example: >> "data": >> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}" >> I am splitting field "data" so that I can process its values individually. >> For that, I have defined a UDTF. >> I store the UDTF output in a temporary view. (Meaning each output of the >> UDTF will contain "0001" 105.0, "0002" 1.21 etc...) >> I use the values in this temporary view to calculate some aggregation >> metrics. >> I am getting an SQL error for step 4. >> Code: >> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic >> from pyflink.table import StreamTableEnvironment, EnvironmentSettings, >> DataTypes, Row >> from pyflink.table.udf import udtf >> from json import loads >> exec_env = StreamExecutionEnvironment.get_execution_environment() >> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >> t_env = StreamTableEnvironment.create(exec_env, >> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) >> >> >> @udtf(input_types=DataTypes.STRING(), result_types= >> [DataTypes.STRING(),DataTypes.DOUBLE()]) >> def split_feature_values(data_string): >> json_data = loads(data_string) >> for f_name, f_value in json_data.items(): >> yield f_name, f_value >> >> # configure the off-heap memory of current taskmanager to enable the python >> worker uses off-heap memory. >> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", >> '80m') >> >> # Register UDTF >> t_env.register_function("split", split_feature_values) >> # ... string constants.... >> >> # Init Kafka input table >> t_env.execute_sql(f""" >> CREATE TABLE {INPUT_TABLE} ( >> monitorId STRING, >> deviceId STRING, >> state INT, >> data STRING, >> time_str TIMESTAMP(3), >> WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' >> SECOND >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = '{INPUT_TOPIC}', >> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >> 'format' = 'json' >> ) >> """) >> >> # 10 sec summary table >> t_env.execute_sql(f""" >> CREATE TABLE {TEN_SEC_OUTPUT_TABLE} ( >> monitorId STRING, >> featureName STRING, >> maxFv DOUBLE, >> minFv DOUBLE, >> avgFv DOUBLE, >> windowStart TIMESTAMP(3), >> WATERMARK FOR windowStart AS windowStart >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = '{TEN_SEC_OUTPUT_TOPIC}', >> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >> 'format' = 'json' >> ) >> """) >> >> # Join with UDTF >> t_env.execute_sql(f""" >> CREATE VIEW tmp_view AS >> SELECT * FROM ( >> SELECT monitorId, T.featureName, T.featureValue, time_str >> FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, >> featureValue) >> ) >> """) >> >> # Create 10 second view <--------------------- this causes the error >> t_env.execute_sql(f""" >> INSERT INTO {TEN_SEC_OUTPUT_TABLE} >> SELECT monitorId, featureName, MAX(featureValue), MIN(featureValue), >> AVG(featureValue), TUMBLE_START(time_str, INTERVAL '10' SECOND) >> FROM tmp_view >> GROUP BY TUMBLE(time_str, INTERVAL '10' SECOND), monitorId, featureName >> """) >> >> The last SQL statement where I calculate metrics causes the error. The error >> message is : >> Traceback (most recent call last): >> File >> "/home/manas/IU_workspace/Flink_POC/pyflink/aggregate_streaming_job.py", >> line 97, in <module> >> """) >> File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table_environment.py", >> line 543, in execute_sql >> return TableResult(self._j_tenv.executeSql(stmt)) >> File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", >> line 1286, in __call__ >> answer, self.gateway_client, self.target_id, self.name >> <http://self.name/>) >> File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", >> line 147, in deco >> return f(*a, **kw) >> File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", >> line 328, in get_return_value >> format(target_id, ".", name), value) >> py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql. >> : org.apache.flink.table.api.ValidationException: SQL validation failed. >> From line 4, column 15 to line 4, column 20: Column 'data' not found in any >> table >> >> I don't understand why Flink wants a "data" column. I discard the "data" >> column in the temporary view, and it certainly does not exist in the >> TEN_SECOND_OUTPUT_TABLE. The only place it exists is in the initial >> INPUT_TABLE which is not relevant for the erroneous SQL statement! >> Clearly I missed understanding something. Have I missed something when >> creating the temporary view? >> >> >> >