[ https://issues.apache.org/jira/browse/FLINK-28253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dian Fu closed FLINK-28253. --------------------------- Resolution: Duplicate > It reports "LocalDateTime is not supported in PyFlink currently" when > converting between Table and DataStream > ------------------------------------------------------------------------------------------------------------- > > Key: FLINK-28253 > URL: https://issues.apache.org/jira/browse/FLINK-28253 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 1.14.0, 1.15.0 > Reporter: Dian Fu > Priority: Major > Fix For: 1.16.0 > > > For the following job: > {code} > from pyflink.datastream.stream_execution_environment import > StreamExecutionEnvironment > from pyflink.table import EnvironmentSettings > from pyflink.table.table_environment import StreamTableEnvironment > if __name__ == '__main__': > env = StreamExecutionEnvironment.get_execution_environment() > settings = EnvironmentSettings.new_instance() \ > .in_streaming_mode() \ > .build() > t_env = StreamTableEnvironment.create(stream_execution_environment=env, > environment_settings=settings) > t_env.execute_sql(""" > CREATE TABLE events ( > `id` VARCHAR, > `source` VARCHAR, > `resources` VARCHAR, > `time` TIMESTAMP(3), > WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'filesystem', > 'path' = 'file:///path/to/input', > 'format' = 'csv' > ) > """) > events_stream_table = t_env.from_path('events') > events_stream = t_env.to_data_stream(events_stream_table) > # Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), > Types.SQL_TIMESTAMP()]) > # now do some processing - let's filter by the type of event we get > codebuild_stream = events_stream.filter( > lambda event: event['source'] == 'aws.codebuild' > ) > codebuild_stream.print() > env.execute() > {code} > It will reports the following exception: > {code} > Traceback (most recent call last): > File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", > line 47, in <module> > process() > File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", > line 39, in process > lambda event: event['source'] == 'aws.codebuild' > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py", > line 432, in filter > self._j_data_stream.getTransformation().getOutputType()) > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", > line 1070, in _from_java_type > TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType()))) > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", > line 1042, in _from_java_type > j_row_field_types] > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", > line 1041, in <listcomp> > row_field_types = [_from_java_type(j_row_field_type) for j_row_field_type > in > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", > line 1072, in _from_java_type > raise TypeError("The java type info: %s is not supported in PyFlink > currently." % j_type_info) > TypeError: The java type info: LocalDateTime is not supported in PyFlink > currently. > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)