[ 
https://issues.apache.org/jira/browse/FLINK-20100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-20100:
-----------------------------------
    Labels: stale-major  (was: )

> Lag aggregate function does not return lag, but current row
> -----------------------------------------------------------
>
>                 Key: FLINK-20100
>                 URL: https://issues.apache.org/jira/browse/FLINK-20100
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python, Table SQL / Planner
>    Affects Versions: 1.11.2, 1.12.0
>            Reporter: Thilo Schneider
>            Priority: Major
>              Labels: stale-major
>
> The lag aggregate function seems to always return the current row and not the 
> row one lagged behind:
> {code:java}
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> env_settings = 
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> t_env = StreamTableEnvironment.create( environment_settings=env_settings)
> t_env.execute_sql("""
> CREATE TABLE datagen (
>  foo INT,
>  message_time AS to_timestamp(from_unixtime(foo)),
>  WATERMARK FOR message_time AS message_time
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='3',
>  'fields.foo.kind'='sequence',
>  'fields.foo.start'='1',
>  'fields.foo.end'='10'
> )""")
> t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen 
> WINDOW w AS (ORDER BY message_time)")
> t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH 
> ('connector' = 'print')")
> t.execute_insert("output")
> {code}
>  This results in
> {code:java}
> +I(1,1)  // Expected (1, null)
> +I(2,2)  // Expected (2, 1)
> +I(3,3)  // Expected (3, 2)
> +I(4,4)  // and so on
> +I(5,5)
> +I(6,6)
> +I(7,7)
> +I(8,8)
> +I(9,9)
> +I(10,10)
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to