Hi, Using pyFlink DDL, I am trying to: 1. Consume a Kafka JSON stream. This has messages with aggregate data, example: "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}" 2. I am splitting field "data" so that I can process its values individually. For that, I have defined a UDTF. 3. I store the UDTF output in a temporary view. (Meaning each output of the UDTF will contain "0001" 105.0, "0002" 1.21 etc...) 4. I use the values in this temporary view to calculate some aggregation metrics.
I am getting an SQL error for step 4. Code: from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes, Row from pyflink.table.udf import udtf from json import loads exec_env = StreamExecutionEnvironment.get_execution_environment() exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) t_env = StreamTableEnvironment.create(exec_env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) @udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(),DataTypes.DOUBLE()]) def split_feature_values(data_string): json_data = loads(data_string) for f_name, f_value in json_data.items(): yield f_name, f_value # configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory. t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m') # Register UDTF t_env.register_function("split", split_feature_values) # ... string constants.... # Init Kafka input table t_env.execute_sql(f""" CREATE TABLE {INPUT_TABLE} ( monitorId STRING, deviceId STRING, state INT, data STRING, time_str TIMESTAMP(3), WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = '{INPUT_TOPIC}', 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', 'format' = 'json' ) """) # 10 sec summary table t_env.execute_sql(f""" CREATE TABLE {TEN_SEC_OUTPUT_TABLE} ( monitorId STRING, featureName STRING, maxFv DOUBLE, minFv DOUBLE, avgFv DOUBLE, windowStart TIMESTAMP(3), WATERMARK FOR windowStart AS windowStart ) WITH ( 'connector' = 'kafka', 'topic' = '{TEN_SEC_OUTPUT_TOPIC}', 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', 'format' = 'json' ) """) # Join with UDTF t_env.execute_sql(f""" CREATE VIEW tmp_view AS SELECT * FROM ( SELECT monitorId, T.featureName, T.featureValue, time_str FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, featureValue) ) """) # Create 10 second view <-*-------------------- this causes the error* t_env.execute_sql(f""" INSERT INTO {TEN_SEC_OUTPUT_TABLE} SELECT monitorId, featureName, MAX(featureValue), MIN(featureValue), AVG(featureValue), TUMBLE_START(time_str, INTERVAL '10' SECOND) FROM tmp_view GROUP BY TUMBLE(time_str, INTERVAL '10' SECOND), monitorId, featureName """) The last SQL statement where I calculate metrics causes the error. The error message is : Traceback (most recent call last): File "/home/manas/IU_workspace/Flink_POC/pyflink/aggregate_streaming_job.py", line 97, in <module> """) File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 543, in execute_sql return TableResult(self._j_tenv.executeSql(stmt)) File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco return f(*a, **kw) File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql. *: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 4, column 15 to line 4, column 20: Column 'data' not found in any table* I don't understand why Flink wants a "data" column. I discard the "data" column in the temporary view, and it certainly does not exist in the TEN_SECOND_OUTPUT_TABLE. The only place it exists is in the initial INPUT_TABLE which is not relevant for the erroneous SQL statement! Clearly I missed understanding something. Have I missed something when creating the temporary view?