[ 
https://issues.apache.org/jira/browse/FLINK-30977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-30977:
-----------------------------------
    Component/s: API / Python

> flink tumbling window stream converting to pandas dataframe not work
> --------------------------------------------------------------------
>
>                 Key: FLINK-30977
>                 URL: https://issues.apache.org/jira/browse/FLINK-30977
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>         Environment: pyflink1.15.2
>            Reporter: Joekwal
>            Priority: Major
>
> I want to know if tumbling window supported to convert to pandas?
> {code:java}
> code... #create env
> kafka_src = """
> CREATE TABLE if not exists `kafka_src` (
> ...
> `event_time` as CAST(`end_time` as TIMESTAMP(3)),
> WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND
> )
> with (
> 'connector' = 'kafka',
> 'topic' = 'topic',
> 'properties.bootstrap.servers' = '***',
> 'properties.group.id' = '***',
> 'scan.startup.mode' = 'earliest-offset',
> 'value.format' = 'debezium-json'
> );
> """  
>   
> t_env.execute_sql(kafka_src)
> table = st_env.sql_query("SELECT columns,`event_time`  \
>     FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' 
> MINUTES))")
> table.execute().print()  #could print the result
> df = table.to_pandas()
> #schema is correct!
> schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()),
>                         .......
>                             ])
> table = st_env.from_pandas(df,schema=schema)
> st_env.create_temporary_view("view_table",table)
> st_env.sql_query("select * from view_table").execute().print() # Not 
> work!Can't print the result {code}
> Tumbling window stream from kafka source convert to pandas dataframe and it 
> can't print the result.The schema is right.I have tested in another job with 
> using batch stream from jdbc source.It can print the result.The only 
> different thing is the input stream.As doc mentioned, the bounded stream is 
> supported to convert to pandas.So what could have gone wrong?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to