Hi Manas, You need to join with the python udtf function. You can try the following sql:
ddl_populate_temporary_table = f""" INSERT INTO {TEMPORARY_TABLE} SELECT * FROM ( SELECT monitorId, featureName, featureData, time_st FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, featureData)) t """ Best, Xingbo Manas Kale <manaskal...@gmail.com> 于2020年7月15日周三 下午7:31写道: > Hi, > I am trying to use a UserDefined Table Function to split up some data as > follows: > > from pyflink.table.udf import udtf > > @udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(), > DataTypes.DOUBLE()]) > def split_feature_values(data_string): > json_data = loads(data_string) > for f_name, f_value in json_data.items(): > yield (f_name, f_value) > > # configure the off-heap memory of current taskmanager to enable the python > worker uses off-heap memory. > t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", > '80m') > > # Register UDTF > t_env.register_function("split", split_feature_values) > ddl_source = f""" > CREATE TABLE {INPUT_TABLE} ( > `monitorId` STRING, > `deviceId` STRING, > `state` INT, > `data` STRING, > `time_st` TIMESTAMP(3), > WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = '{INPUT_TOPIC}', > 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', > 'format' = 'json' > ) > """ > > ddl_temporary_table = f""" > CREATE TABLE {TEMPORARY_TABLE} ( > `monitorId` STRING, > `featureName` STRING, > `featureData` DOUBLE, > `time_st` TIMESTAMP(3), > WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND > ) > """ > > ddl_populate_temporary_table = f""" > INSERT INTO {TEMPORARY_TABLE} > SELECT monitorId, split(data), time_st > FROM {INPUT_TABLE} > """ > > t_env.execute_sql(ddl_source) > t_env.execute_sql(ddl_temporary_table) > t_env.execute_sql(ddl_populate_temporary_table) > > > However, I get the following error : > py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql. > : org.apache.flink.table.api.ValidationException: SQL validation failed. > From line 3, column 23 to line 3, column 33:* No match found for function > signature split(<CHARACTER>)* > > I believe I am using the correct call to register the UDTF as per [1]. Am > I missing something? > > Thanks, > Manas > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#table-functions >