[ https://issues.apache.org/jira/browse/FLINK-20100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-20100: ----------------------------------- Labels: stale-major (was: ) > Lag aggregate function does not return lag, but current row > ----------------------------------------------------------- > > Key: FLINK-20100 > URL: https://issues.apache.org/jira/browse/FLINK-20100 > Project: Flink > Issue Type: Bug > Components: API / Python, Table SQL / Planner > Affects Versions: 1.11.2, 1.12.0 > Reporter: Thilo Schneider > Priority: Major > Labels: stale-major > > The lag aggregate function seems to always return the current row and not the > row one lagged behind: > {code:java} > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > t_env = StreamTableEnvironment.create( environment_settings=env_settings) > t_env.execute_sql(""" > CREATE TABLE datagen ( > foo INT, > message_time AS to_timestamp(from_unixtime(foo)), > WATERMARK FOR message_time AS message_time > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second'='3', > 'fields.foo.kind'='sequence', > 'fields.foo.start'='1', > 'fields.foo.end'='10' > )""") > t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen > WINDOW w AS (ORDER BY message_time)") > t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH > ('connector' = 'print')") > t.execute_insert("output") > {code} > This results in > {code:java} > +I(1,1) // Expected (1, null) > +I(2,2) // Expected (2, 1) > +I(3,3) // Expected (3, 2) > +I(4,4) // and so on > +I(5,5) > +I(6,6) > +I(7,7) > +I(8,8) > +I(9,9) > +I(10,10) > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)