[ 
https://issues.apache.org/jira/browse/FLINK-30977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joekwal updated FLINK-30977:
----------------------------
    Description: 
I want to know if tumbling window supported to convert to pandas?
{code:java}
code... #create env

kafka_src = """
CREATE TABLE if not exists `kafka_src` (
...
`event_time` as CAST(`end_time` as TIMESTAMP(3)),
WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND
)
with (
'connector' = 'kafka',
'topic' = 'topic',
'properties.bootstrap.servers' = '***',
'properties.group.id' = '***',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
"""  
  
t_env.execute_sql(kafka_src)
table = st_env.sql_query("SELECT columns,`event_time`  \
    FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' 
MINUTES))")

table.execute().print()  #could print the result

df = table.to_pandas()

#schema is correct!
schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()),
                        .......
                            ])
table = st_env.from_pandas(df,schema=schema)
st_env.create_temporary_view("view_table",table)

st_env.sql_query("select * from view_table").execute().print() # Not work!Can't 
print the result {code}
Tumbling window stream from kafka source convert to pandas dataframe and it 
can't print the result.The schema is right.I have tested in another job with 
using batch stream from jdbc source.It can print the result.The only different 
thing is the input stream.As doc mentioned, the bounded stream is supported to 
convert to pandas.So what could have gone wrong?

  was:
I want to know if tumbling window supported to convert to pandas?
{code:java}
code... #create env
kafka_src = """
CREATE TABLE if not exists `kafka_src` (
...
`event_time` as CAST(`end_time` as TIMESTAMP(3)),
WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND
)
with (
'connector' = 'kafka',
'topic' = 'topic',
'properties.bootstrap.servers' = '***',
'properties.group.id' = '***',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
"""    
t_env.execute_sql(kafka_src)
table = st_env.sql_query("SELECT columns,`event_time`  \
    FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' 
MINUTES))")
table.execute().print()  #could print the result
df = table.to_pandas()
#schema is correct!
schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()),
                        .......
                            ])
table = st_env.from_pandas(df,schema=schema)
st_env.create_temporary_view("view_table",table)
st_env.sql_query("select * from view_table").execute().print() # Not work!Can't 
print the result {code}

Tumbling window stream from kafka source convert to pandas dataframe and it 
can't print the result.The schema is right.I have tested in another job with 
using batch stream from jdbc source.It can print the result.The only different 
thing is the input stream.As doc mentioned, the bounded stream is supported to 
convert to pandas.So what could have gone wrong?


> flink tumbling window stream converting to pandas dataframe not work
> --------------------------------------------------------------------
>
>                 Key: FLINK-30977
>                 URL: https://issues.apache.org/jira/browse/FLINK-30977
>             Project: Flink
>          Issue Type: Bug
>         Environment: pyflink1.15.2
>            Reporter: Joekwal
>            Priority: Major
>
> I want to know if tumbling window supported to convert to pandas?
> {code:java}
> code... #create env
> kafka_src = """
> CREATE TABLE if not exists `kafka_src` (
> ...
> `event_time` as CAST(`end_time` as TIMESTAMP(3)),
> WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND
> )
> with (
> 'connector' = 'kafka',
> 'topic' = 'topic',
> 'properties.bootstrap.servers' = '***',
> 'properties.group.id' = '***',
> 'scan.startup.mode' = 'earliest-offset',
> 'value.format' = 'debezium-json'
> );
> """  
>   
> t_env.execute_sql(kafka_src)
> table = st_env.sql_query("SELECT columns,`event_time`  \
>     FROM TABLE(TUMBLE(TABLE table_name, DESCRIPTOR(event_time), INTERVAL '1' 
> MINUTES))")
> table.execute().print()  #could print the result
> df = table.to_pandas()
> #schema is correct!
> schema = DataTypes.ROW([DataTypes.FIELD("column1", DataTypes.STRING()),
>                         .......
>                             ])
> table = st_env.from_pandas(df,schema=schema)
> st_env.create_temporary_view("view_table",table)
> st_env.sql_query("select * from view_table").execute().print() # Not 
> work!Can't print the result {code}
> Tumbling window stream from kafka source convert to pandas dataframe and it 
> can't print the result.The schema is right.I have tested in another job with 
> using batch stream from jdbc source.It can print the result.The only 
> different thing is the input stream.As doc mentioned, the bounded stream is 
> supported to convert to pandas.So what could have gone wrong?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to