Thilo Schneider created FLINK-20100: ---------------------------------------
Summary: Lag aggregate function does not return lag, but current row Key: FLINK-20100 URL: https://issues.apache.org/jira/browse/FLINK-20100 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.11.2 Reporter: Thilo Schneider The lag aggregate function seems to always return the current row and not the row one lagged behind: {code:java} from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create( environment_settings=env_settings) t_env.execute_sql(""" CREATE TABLE datagen ( foo INT, message_time AS to_timestamp(from_unixtime(foo)), WATERMARK FOR message_time AS message_time ) WITH ( 'connector' = 'datagen', 'rows-per-second'='3', 'fields.foo.kind'='sequence', 'fields.foo.start'='1', 'fields.foo.end'='10' )""") t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (ORDER BY message_time)") t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH ('connector' = 'print')") t.execute_insert("output") {code} This results in {code:java} +I(1,1) // Expected (1, null) +I(2,2) // Expected (2, 1) +I(3,3) // Expected (3, 2) +I(4,4) // and so on +I(5,5) +I(6,6) +I(7,7) +I(8,8) +I(9,9) +I(10,10) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)