I'm trying to calculate a simple rolling average using pyflink, but somehow the last rows streaming in seem to be excluded, which i expected to be the result of data arriving out of order. However i fail to understand why.
exec_env = StreamExecutionEnvironment.get_execution_environment() exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(exec_env,environment_settings=env_settings) table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/jag002/flinkjars/flink-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/kafka-clients-2.7.0.jar") source_ts_ddl = f""" CREATE TABLE kafka_ts_source ( sender VARCHAR, length DOUBLE, rot DOUBLE, sog DOUBLE, stw DOUBLE, x_time STRING, *rt as TO_TIMESTAMP(x_time), WATERMARK FOR rt as rt - INTERVAL '1' second* ) with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'dms_ts', 'connector.properties.bootstrap.servers' = '{kafka_servers}', 'connector.properties.zookeeper.connect' = '{kafka_zookeeper_servers}', 'connector.properties.group.id' = '{kafka_consumer_group_id}', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ) """ table_env.sql_update(source_ts_ddl) calculate_table = table_env.sql_query("""SELECT sender, stw, rt FROM kafka_ts_source """) temp1_ddl = f""" CREATE TABLE kafka_temp1_sink ( sender VARCHAR, stw DOUBLE, rt TIMESTAMP(3) ) with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'temp_sink1', 'connector.properties.bootstrap.servers' = '{kafka_servers}', 'connector.properties.zookeeper.connect' = '{kafka_zookeeper_servers}', 'connector.properties.group.id' = '{kafka_consumer_group_id}', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ) """ table_env.sql_update(temp1_ddl) calculate_table.execute_insert("kafka_temp1_sink") *averages_table = calculate_table.select("sender, stw").over_window(Over.partition_by("sender").order_by("rt") \ .preceding(expr.lit(50).seconds) \ .following("CURRENT_RANGE").alias("w")) \ .select(calculate_table.sender.alias('sender'), calculate_table.rt.alias('rt'), calculate_table.stw.alias('stw'), calculate_table.stw.avg.over(expr.col('w')).alias('avgstw'), calculate_table.rt.min.over(expr.col('w')).cast(DataTypes.TIMESTAMP(3)).alias('minrt'), calculate_table.rt.max.over(expr.col('w')).cast(DataTypes.TIMESTAMP(3)).alias('maxrt'))* temp2_ddl = f""" CREATE TABLE kafka_temp2_sink ( sender VARCHAR, rt TIMESTAMP(3), stw DOUBLE, avgstw DOUBLE, minrt TIMESTAMP(3), maxrt TIMESTAMP(3) ) with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'temp_sink2', 'connector.properties.bootstrap.servers' = '{kafka_servers}', 'connector.properties.zookeeper.connect' = '{kafka_zookeeper_servers}', 'connector.properties.group.id' = '{kafka_consumer_group_id}', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ) """ table_env.sql_update(temp2_ddl) averages_table.execute_insert("kafka_temp2_sink").wait() table_env.execute("ark_demo") my input is 44 events: index sender stw rt 0 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000 2021-02-18T08:55:19Z 1 3a1aca88-a31b-4bcc-82a0-629f58136570 0.200000 2021-02-18T08:55:27Z 2 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 1.000000 2021-02-18T09:05:13Z 3 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 6.900000 2021-02-18T08:54:18Z 4 ae6d0b4c-83c9-474a-8915-94288d433167 0.200000 2021-02-18T08:55:41Z 5 c3519487-8e6f-4603-b88c-17fd687baa6b 0.200000 2021-02-18T08:55:48Z 6 c7b8692c-d233-4fa1-9432-8f477458ca24 0.200000 2021-02-18T08:55:50Z 7 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000 2021-02-18T08:55:49Z 8 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 1.000000 2021-02-18T09:05:43Z 9 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 6.900000 2021-02-18T08:54:48Z 10 b760d3ec-f46c-4aad-b7fe-5934914ab948 16.790001 2021-02-18T08:56:18Z 11 c7b8692c-d233-4fa1-9432-8f477458ca24 0.200000 2021-02-18T08:56:21Z 12 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2.500000 2021-02-18T08:56:26Z 13 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000 2021-02-18T08:56:19Z 14 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 1.000000 2021-02-18T09:06:14Z 15 98c892ef-a2ba-493d-a861-0ccdecd882e0 0.200000 2021-02-18T08:56:36Z 16 6e452179-12e3-4826-9c2c-15fdde20e5e5 0.100000 2021-02-18T08:56:36Z 17 c3519487-8e6f-4603-b88c-17fd687baa6b 0.200000 2021-02-18T08:56:48Z 18 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000 2021-02-18T08:56:49Z 19 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 1.000000 2021-02-18T09:06:43Z 20 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 6.900000 2021-02-18T08:55:48Z 21 b760d3ec-f46c-4aad-b7fe-5934914ab948 16.889999 2021-02-18T08:57:19Z 22 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 1.000000 2021-02-18T09:07:14Z 23 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 6.900000 2021-02-18T08:56:19Z 24 c7b8692c-d233-4fa1-9432-8f477458ca24 0.200000 2021-02-18T08:57:51Z 25 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000 2021-02-18T08:57:50Z 26 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 6.900000 2021-02-18T08:56:49Z 27 6e452179-12e3-4826-9c2c-15fdde20e5e5 0.200000 2021-02-18T08:58:06Z 28 c3519487-8e6f-4603-b88c-17fd687baa6b 0.200000 2021-02-18T08:58:18Z 29 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2.500000 2021-02-18T08:58:27Z 30 3a1aca88-a31b-4bcc-82a0-629f58136570 0.200000 2021-02-18T08:58:26Z 31 ce37f455-6db4-4103-a497-426ed5be14b4 0.200000 2021-02-18T08:58:34Z 32 98c892ef-a2ba-493d-a861-0ccdecd882e0 0.100000 2021-02-18T08:58:35Z 33 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2.500000 2021-02-18T08:58:57Z 34 52586a23-02a1-4cad-9599-dd9f42752e24 0.060000 2021-02-18T08:58:54Z 35 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000 2021-02-18T08:58:49Z 36 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 1.000000 2021-02-18T09:08:45Z 37 3a1aca88-a31b-4bcc-82a0-629f58136570 0.100000 2021-02-18T08:58:56Z 38 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 6.900000 2021-02-18T08:57:49Z 39 98c892ef-a2ba-493d-a861-0ccdecd882e0 0.100000 2021-02-18T08:59:06Z 40 ae6d0b4c-83c9-474a-8915-94288d433167 0.200000 2021-02-18T08:59:11Z 41 6e452179-12e3-4826-9c2c-15fdde20e5e5 0.100000 2021-02-18T08:59:06Z 42 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2.500000 2021-02-18T08:59:26Z 43 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000 2021-02-18T08:59:20Z The output when manually joined to the input set shows the last rows are all not accounted for: sender rt stw stw_r avgstw minrt maxrt diff 0 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:55:19Z 3.000000 3.000000 3.000000 2021-02-18T08:55:19Z 2021-02-18T08:55:19Z NaN 1 3a1aca88-a31b-4bcc-82a0-629f58136570 2021-02-18T08:55:27Z 0.200000 0.200000 0.200000 2021-02-18T08:55:27Z 2021-02-18T08:55:27Z 0.0 2 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 2021-02-18T09:05:13Z 1.000000 1.000000 1.000000 2021-02-18T09:05:13Z 2021-02-18T09:05:13Z 9.0 3 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 2021-02-18T08:54:18Z 6.900000 6.900000 6.900000 2021-02-18T08:54:18Z 2021-02-18T08:54:18Z -11.0 4 ae6d0b4c-83c9-474a-8915-94288d433167 2021-02-18T08:55:41Z 0.200000 0.200000 0.200000 2021-02-18T08:55:41Z 2021-02-18T08:55:41Z 1.0 5 c3519487-8e6f-4603-b88c-17fd687baa6b 2021-02-18T08:55:48Z 0.200000 0.200000 0.200000 2021-02-18T08:55:48Z 2021-02-18T08:55:48Z 0.0 6 c7b8692c-d233-4fa1-9432-8f477458ca24 2021-02-18T08:55:50Z 0.200000 0.200000 0.200000 2021-02-18T08:55:50Z 2021-02-18T08:55:50Z 0.0 7 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:55:49Z 3.000000 3.000000 3.000000 2021-02-18T08:55:49Z 2021-02-18T08:55:49Z -1.0 8 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 2021-02-18T09:05:43Z 1.000000 1.000000 1.000000 2021-02-18T09:05:13Z 2021-02-18T09:05:43Z 9.0 9 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 2021-02-18T08:54:48Z 6.900000 6.900000 6.900000 2021-02-18T08:54:48Z 2021-02-18T08:54:48Z -11.0 10 b760d3ec-f46c-4aad-b7fe-5934914ab948 2021-02-18T08:56:18Z 16.790001 16.790001 16.790001 2021-02-18T08:56:18Z 2021-02-18T08:56:18Z 1.0 11 c7b8692c-d233-4fa1-9432-8f477458ca24 2021-02-18T08:56:21Z 0.200000 0.200000 0.200000 2021-02-18T08:56:21Z 2021-02-18T08:56:21Z 0.0 12 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2021-02-18T08:56:26Z 2.500000 2.500000 2.500000 2021-02-18T08:56:26Z 2021-02-18T08:56:26Z 0.0 13 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:56:19Z 3.000000 3.000000 3.000000 2021-02-18T08:56:19Z 2021-02-18T08:56:19Z -1.0 14 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 2021-02-18T09:06:14Z 1.000000 1.000000 1.000000 2021-02-18T09:05:43Z 2021-02-18T09:06:14Z 9.0 15 98c892ef-a2ba-493d-a861-0ccdecd882e0 2021-02-18T08:56:36Z 0.200000 0.200000 0.200000 2021-02-18T08:56:36Z 2021-02-18T08:56:36Z -10.0 16 6e452179-12e3-4826-9c2c-15fdde20e5e5 2021-02-18T08:56:36Z 0.100000 0.100000 0.100000 2021-02-18T08:56:36Z 2021-02-18T08:56:36Z 0.0 17 c3519487-8e6f-4603-b88c-17fd687baa6b 2021-02-18T08:56:48Z 0.200000 0.200000 0.200000 2021-02-18T08:56:48Z 2021-02-18T08:56:48Z 0.0 18 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:56:49Z 3.000000 3.000000 3.000000 2021-02-18T08:56:49Z 2021-02-18T08:56:49Z 0.0 19 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 2021-02-18T09:06:43Z 1.000000 1.000000 1.000000 2021-02-18T09:06:14Z 2021-02-18T09:06:43Z 9.0 20 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 2021-02-18T08:55:48Z 6.900000 6.900000 6.900000 2021-02-18T08:55:48Z 2021-02-18T08:55:48Z -11.0 21 b760d3ec-f46c-4aad-b7fe-5934914ab948 2021-02-18T08:57:19Z 16.889999 16.889999 16.889999 2021-02-18T08:57:19Z 2021-02-18T08:57:19Z 1.0 22 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 2021-02-18T09:07:14Z 1.000000 1.000000 1.000000 2021-02-18T09:06:43Z 2021-02-18T09:07:14Z 9.0 23 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 2021-02-18T08:56:19Z 6.900000 6.900000 6.900000 2021-02-18T08:56:19Z 2021-02-18T08:56:19Z -11.0 24 c7b8692c-d233-4fa1-9432-8f477458ca24 2021-02-18T08:57:51Z 0.200000 0.200000 0.200000 2021-02-18T08:57:51Z 2021-02-18T08:57:51Z 1.0 25 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:57:50Z 3.000000 3.000000 3.000000 2021-02-18T08:57:50Z 2021-02-18T08:57:50Z -1.0 26 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 2021-02-18T08:56:49Z 6.900000 6.900000 6.900000 2021-02-18T08:56:19Z 2021-02-18T08:56:49Z -2.0 27 6e452179-12e3-4826-9c2c-15fdde20e5e5 2021-02-18T08:58:06Z 0.200000 0.200000 0.200000 2021-02-18T08:58:06Z 2021-02-18T08:58:06Z 1.0 28 c3519487-8e6f-4603-b88c-17fd687baa6b 2021-02-18T08:58:18Z 0.200000 0.200000 0.200000 2021-02-18T08:58:18Z 2021-02-18T08:58:18Z 0.0 29 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2021-02-18T08:58:27Z 2.500000 2.500000 2.500000 2021-02-18T08:58:27Z 2021-02-18T08:58:27Z 0.0 30 3a1aca88-a31b-4bcc-82a0-629f58136570 2021-02-18T08:58:26Z 0.200000 0.200000 0.200000 2021-02-18T08:58:26Z 2021-02-18T08:58:26Z -1.0 31 ce37f455-6db4-4103-a497-426ed5be14b4 2021-02-18T08:58:34Z 0.200000 0.200000 0.200000 2021-02-18T08:58:34Z 2021-02-18T08:58:34Z 0.0 32 98c892ef-a2ba-493d-a861-0ccdecd882e0 2021-02-18T08:58:35Z 0.100000 0.100000 0.100000 2021-02-18T08:58:35Z 2021-02-18T08:58:35Z 0.0 33 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2021-02-18T08:58:57Z 2.500000 2.500000 2.500000 2021-02-18T08:58:27Z 2021-02-18T08:58:57Z 0.0 34 52586a23-02a1-4cad-9599-dd9f42752e24 2021-02-18T08:58:54Z 0.060000 0.060000 0.060000 2021-02-18T08:58:54Z 2021-02-18T08:58:54Z -1.0 35 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:58:49Z 3.000000 3.000000 3.000000 2021-02-18T08:58:49Z 2021-02-18T08:58:49Z -1.0 36 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 2021-02-18T09:08:45Z 1.000000 NaN NaN NaN NaN 9.0 37 3a1aca88-a31b-4bcc-82a0-629f58136570 2021-02-18T08:58:56Z 0.100000 NaN NaN NaN NaN -10.0 38 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 2021-02-18T08:57:49Z 6.900000 NaN NaN NaN NaN -2.0 39 98c892ef-a2ba-493d-a861-0ccdecd882e0 2021-02-18T08:59:06Z 0.100000 NaN NaN NaN NaN 1.0 40 ae6d0b4c-83c9-474a-8915-94288d433167 2021-02-18T08:59:11Z 0.200000 NaN NaN NaN NaN 0.0 41 6e452179-12e3-4826-9c2c-15fdde20e5e5 2021-02-18T08:59:06Z 0.100000 NaN NaN NaN NaN -1.0 42 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2021-02-18T08:59:26Z 2.500000 NaN NaN NaN NaN 0.0 43 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:59:20Z 3.000000 NaN NaN NaN NaN -1.0 Can anybody enlighten me? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/