Hi Manas,

It seems a bug of the create view operation. I have created a JIRA for it: 
https://issues.apache.org/jira/browse/FLINK-18750 
<https://issues.apache.org/jira/browse/FLINK-18750>

Before repairing, please do not use create view operation for udtf call. 

Best,
Wei

> 在 2020年7月28日,21:19,Wei Zhong <weizhong0...@gmail.com> 写道:
> 
> Hi Manas,
> 
> It seems like a bug. You can try to replace the udtf sql call with such code 
> as a workaround currently:
> 
> t_env.register_table("tmp_view", 
> t_env.from_path(f"{INPUT_TABLE}").join_lateral("split(data) as (featureName, 
> featureValue)"))
> 
> This works for me. I’ll try to find out what caused this exception.
> 
> Best,
> Wei
> 
>> 在 2020年7月28日,18:33,Manas Kale <manaskal...@gmail.com 
>> <mailto:manaskal...@gmail.com>> 写道:
>> 
>> Hi,
>> Using pyFlink DDL, I am trying to:
>> Consume a Kafka JSON stream. This has messages with aggregate data, example: 
>>  "data": 
>> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}"
>> I am splitting field "data" so that I can process its values individually. 
>> For that, I have defined a UDTF.
>> I store the UDTF output in a temporary view. (Meaning each output of the 
>> UDTF will contain "0001" 105.0, "0002" 1.21 etc...)
>> I use the values in this temporary view to calculate some aggregation 
>> metrics.
>> I am getting an SQL error for  step 4.
>> Code:
>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings, 
>> DataTypes, Row
>> from pyflink.table.udf import udtf
>> from json import loads
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> t_env = StreamTableEnvironment.create(exec_env, 
>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
>> 
>> 
>> @udtf(input_types=DataTypes.STRING(), result_types= 
>> [DataTypes.STRING(),DataTypes.DOUBLE()])
>> def split_feature_values(data_string):
>>     json_data = loads(data_string)
>>     for f_name, f_value in json_data.items():
>>         yield f_name, f_value
>> 
>> # configure the off-heap memory of current taskmanager to enable the python 
>> worker uses off-heap memory.
>> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>>  '80m')
>> 
>> # Register UDTF
>> t_env.register_function("split", split_feature_values)
>> # ... string constants....
>> 
>> # Init Kafka input table
>> t_env.execute_sql(f"""               
>>     CREATE TABLE {INPUT_TABLE} (
>>         monitorId STRING,
>>         deviceId STRING,
>>         state INT,
>>         data STRING,
>>         time_str TIMESTAMP(3),
>>         WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' 
>> SECOND
>>     ) WITH (
>>         'connector' = 'kafka',
>>         'topic' = '{INPUT_TOPIC}',
>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>         'format' = 'json'
>>     )
>> """)
>> 
>> # 10 sec summary table
>> t_env.execute_sql(f"""
>>     CREATE TABLE {TEN_SEC_OUTPUT_TABLE} (
>>         monitorId STRING,
>>         featureName STRING,
>>         maxFv DOUBLE,
>>         minFv DOUBLE,
>>         avgFv DOUBLE,
>>         windowStart TIMESTAMP(3),
>>         WATERMARK FOR windowStart AS windowStart
>>     ) WITH (
>>         'connector' = 'kafka',
>>         'topic' = '{TEN_SEC_OUTPUT_TOPIC}',
>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>         'format' = 'json'
>>     )
>> """)
>> 
>> # Join with UDTF
>> t_env.execute_sql(f"""
>>     CREATE VIEW tmp_view AS
>>     SELECT * FROM ( 
>>         SELECT monitorId, T.featureName, T.featureValue, time_str
>>         FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, 
>> featureValue)
>>     )
>> """)
>> 
>> # Create 10 second view <--------------------- this causes the error
>> t_env.execute_sql(f"""
>>     INSERT INTO {TEN_SEC_OUTPUT_TABLE}
>>     SELECT monitorId, featureName, MAX(featureValue), MIN(featureValue), 
>> AVG(featureValue), TUMBLE_START(time_str, INTERVAL '10' SECOND)
>>     FROM tmp_view
>>     GROUP BY TUMBLE(time_str, INTERVAL '10' SECOND), monitorId, featureName
>> """)
>> 
>> The last SQL statement where I calculate metrics causes the error. The error 
>> message is :
>> Traceback (most recent call last):
>>   File 
>> "/home/manas/IU_workspace/Flink_POC/pyflink/aggregate_streaming_job.py", 
>> line 97, in <module>
>>     """)
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table_environment.py",
>>  line 543, in execute_sql
>>     return TableResult(self._j_tenv.executeSql(stmt))
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>>  line 1286, in __call__
>>     answer, self.gateway_client, self.target_id, self.name 
>> <http://self.name/>)
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>  line 147, in deco
>>     return f(*a, **kw)
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>>  line 328, in get_return_value
>>     format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
>> : org.apache.flink.table.api.ValidationException: SQL validation failed. 
>> From line 4, column 15 to line 4, column 20: Column 'data' not found in any 
>> table
>> 
>> I don't understand why Flink wants a "data" column. I discard the "data" 
>> column in the temporary view, and it certainly does not exist in the
>> TEN_SECOND_OUTPUT_TABLE. The only place it exists is in the initial 
>> INPUT_TABLE which is not relevant for the erroneous SQL statement!
>> Clearly I missed understanding something. Have I missed something when 
>> creating the temporary view?  
>> 
>> 
>> 
> 

Reply via email to