Thilo Schneider created FLINK-20128: ---------------------------------------
Summary: Data loss for over windows with rows unbounded preceding Key: FLINK-20128 URL: https://issues.apache.org/jira/browse/FLINK-20128 Project: Flink Issue Type: Bug Components: API / Python, Table SQL / Planner Affects Versions: 1.11.2, 1.12.0 Reporter: Thilo Schneider When using partitioned, unbounded over windows, all but one partitions are dropped in the output dataset: {code:python} # Setup from pyflink.table import EnvironmentSettings, StreamTableEnvironment from biafflink import debug_print_table env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create( environment_settings=env_settings) t_env.get_config().get_configuration().set_integer("table.exec.resource.default-parallelism", 1) t_env.execute_sql(""" CREATE TABLE datagen ( foo INT, id AS mod(foo, 2), message_time AS to_timestamp(from_unixtime(FLOOR(foo/2))), WATERMARK FOR message_time AS message_time ) WITH ( 'connector' = 'datagen', 'rows-per-second'='2', 'fields.foo.kind'='sequence', 'fields.foo.start'='0', 'fields.foo.end'='19' )""") t_env.execute_sql("CREATE TABLE output (foo INT, id INT, lagfoo INT) WITH ('connector' = 'print')") {code} Using bounded over windows, everything works as expected: {code:python} t = t_env.sql_query(""" SELECT foo, id, avg(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (PARTITION BY id ORDER BY message_time ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)""") t.execute_insert("output") {code} yields {code:python} +I(0,0,0) +I(1,1,1) +I(2,0,1) +I(3,1,2) +I(4,0,3) ... {code} If we change the window to unbounded preceding: {code:python} t = t_env.sql_query(""" SELECT foo, id, avg(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (PARTITION BY id ORDER BY message_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)""") t.execute_insert("output") {code} we loose all of id == 1: {code:python} +I(0,0,0) +I(2,0,1) +I(4,0,2) +I(6,0,3) +I(8,0,4) ... {code} I observed this problem with various aggregate functions and both under 1.11.2 and 1.12rc1. -- This message was sent by Atlassian Jira (v8.3.4#803005)