I'm trying to calculate a simple rolling average using pyflink, but somehow
the last rows streaming in seem to be excluded, which i expected to be the
result of data arriving out of order. However i fail to understand why. 

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env =
StreamTableEnvironment.create(exec_env,environment_settings=env_settings)
table_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///Users/jag002/flinkjars/flink-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/kafka-clients-2.7.0.jar")

source_ts_ddl = f"""
                CREATE TABLE kafka_ts_source (
                        sender VARCHAR,
                        length DOUBLE,
                        rot DOUBLE,
                        sog DOUBLE,
                        stw DOUBLE,
                        x_time STRING,
                        *rt as TO_TIMESTAMP(x_time),
                        WATERMARK FOR rt as rt - INTERVAL '1' second*
                ) with (
                  'connector.type' = 'kafka',
                  'connector.version' = 'universal',
                  'connector.topic' = 'dms_ts',
                  'connector.properties.bootstrap.servers' = '{kafka_servers}',
                  'connector.properties.zookeeper.connect' = 
'{kafka_zookeeper_servers}',
                  'connector.properties.group.id' = '{kafka_consumer_group_id}',
                  'connector.startup-mode' = 'latest-offset',
                  'format.type' = 'json'
                )
                """
table_env.sql_update(source_ts_ddl)

calculate_table = table_env.sql_query("""SELECT 
        sender,
        stw,  
        rt
        FROM kafka_ts_source
        """)

temp1_ddl = f"""
                        CREATE TABLE kafka_temp1_sink (
                                sender VARCHAR,
                                stw DOUBLE,
                                rt TIMESTAMP(3)
                        ) with (
                          'connector.type' = 'kafka',
                          'connector.version' = 'universal',
                          'connector.topic' = 'temp_sink1',
                          'connector.properties.bootstrap.servers' = 
'{kafka_servers}',
                          'connector.properties.zookeeper.connect' = 
'{kafka_zookeeper_servers}',
                          'connector.properties.group.id' = 
'{kafka_consumer_group_id}',
                          'connector.startup-mode' = 'latest-offset',
                          'format.type' = 'json'
                        )
        """
table_env.sql_update(temp1_ddl)
calculate_table.execute_insert("kafka_temp1_sink")

*averages_table = calculate_table.select("sender,
stw").over_window(Over.partition_by("sender").order_by("rt") \
        .preceding(expr.lit(50).seconds) \
        .following("CURRENT_RANGE").alias("w")) \
        .select(calculate_table.sender.alias('sender'),
calculate_table.rt.alias('rt'), calculate_table.stw.alias('stw'),
                calculate_table.stw.avg.over(expr.col('w')).alias('avgstw'), 
        
calculate_table.rt.min.over(expr.col('w')).cast(DataTypes.TIMESTAMP(3)).alias('minrt'),
 
        
calculate_table.rt.max.over(expr.col('w')).cast(DataTypes.TIMESTAMP(3)).alias('maxrt'))*

temp2_ddl = f"""
                        CREATE TABLE kafka_temp2_sink (
                                sender VARCHAR,
                                rt TIMESTAMP(3),
                                stw DOUBLE,
                                avgstw DOUBLE,
                                minrt TIMESTAMP(3),
                                maxrt TIMESTAMP(3)
                        ) with (
                          'connector.type' = 'kafka',
                          'connector.version' = 'universal',
                          'connector.topic' = 'temp_sink2',
                          'connector.properties.bootstrap.servers' = 
'{kafka_servers}',
                          'connector.properties.zookeeper.connect' = 
'{kafka_zookeeper_servers}',
                          'connector.properties.group.id' = 
'{kafka_consumer_group_id}',
                          'connector.startup-mode' = 'latest-offset',
                          'format.type' = 'json'
                        )
        """
table_env.sql_update(temp2_ddl)
averages_table.execute_insert("kafka_temp2_sink").wait()
table_env.execute("ark_demo")

my input is 44 events:
index sender    stw     rt
0       da2fe388-390d-41d7-982b-2a15dc990f06    3.000000        
2021-02-18T08:55:19Z
1       3a1aca88-a31b-4bcc-82a0-629f58136570    0.200000        
2021-02-18T08:55:27Z
2       4c7d0d79-cf48-4df7-a590-dcfacca03ee4    1.000000        
2021-02-18T09:05:13Z
3       1d2e4d88-c3a0-423c-bd3e-74e27a43636b    6.900000        
2021-02-18T08:54:18Z
4       ae6d0b4c-83c9-474a-8915-94288d433167    0.200000        
2021-02-18T08:55:41Z
5       c3519487-8e6f-4603-b88c-17fd687baa6b    0.200000        
2021-02-18T08:55:48Z
6       c7b8692c-d233-4fa1-9432-8f477458ca24    0.200000        
2021-02-18T08:55:50Z
7       da2fe388-390d-41d7-982b-2a15dc990f06    3.000000        
2021-02-18T08:55:49Z
8       4c7d0d79-cf48-4df7-a590-dcfacca03ee4    1.000000        
2021-02-18T09:05:43Z
9       1d2e4d88-c3a0-423c-bd3e-74e27a43636b    6.900000        
2021-02-18T08:54:48Z
10      b760d3ec-f46c-4aad-b7fe-5934914ab948    16.790001       
2021-02-18T08:56:18Z
11      c7b8692c-d233-4fa1-9432-8f477458ca24    0.200000        
2021-02-18T08:56:21Z
12      a68c8bc5-366c-4590-af0e-e9d7248f8b12    2.500000        
2021-02-18T08:56:26Z
13      da2fe388-390d-41d7-982b-2a15dc990f06    3.000000        
2021-02-18T08:56:19Z
14      4c7d0d79-cf48-4df7-a590-dcfacca03ee4    1.000000        
2021-02-18T09:06:14Z
15      98c892ef-a2ba-493d-a861-0ccdecd882e0    0.200000        
2021-02-18T08:56:36Z
16      6e452179-12e3-4826-9c2c-15fdde20e5e5    0.100000        
2021-02-18T08:56:36Z
17      c3519487-8e6f-4603-b88c-17fd687baa6b    0.200000        
2021-02-18T08:56:48Z
18      da2fe388-390d-41d7-982b-2a15dc990f06    3.000000        
2021-02-18T08:56:49Z
19      4c7d0d79-cf48-4df7-a590-dcfacca03ee4    1.000000        
2021-02-18T09:06:43Z
20      1d2e4d88-c3a0-423c-bd3e-74e27a43636b    6.900000        
2021-02-18T08:55:48Z
21      b760d3ec-f46c-4aad-b7fe-5934914ab948    16.889999       
2021-02-18T08:57:19Z
22      4c7d0d79-cf48-4df7-a590-dcfacca03ee4    1.000000        
2021-02-18T09:07:14Z
23      1d2e4d88-c3a0-423c-bd3e-74e27a43636b    6.900000        
2021-02-18T08:56:19Z
24      c7b8692c-d233-4fa1-9432-8f477458ca24    0.200000        
2021-02-18T08:57:51Z
25      da2fe388-390d-41d7-982b-2a15dc990f06    3.000000        
2021-02-18T08:57:50Z
26      1d2e4d88-c3a0-423c-bd3e-74e27a43636b    6.900000        
2021-02-18T08:56:49Z
27      6e452179-12e3-4826-9c2c-15fdde20e5e5    0.200000        
2021-02-18T08:58:06Z
28      c3519487-8e6f-4603-b88c-17fd687baa6b    0.200000        
2021-02-18T08:58:18Z
29      a68c8bc5-366c-4590-af0e-e9d7248f8b12    2.500000        
2021-02-18T08:58:27Z
30      3a1aca88-a31b-4bcc-82a0-629f58136570    0.200000        
2021-02-18T08:58:26Z
31      ce37f455-6db4-4103-a497-426ed5be14b4    0.200000        
2021-02-18T08:58:34Z
32      98c892ef-a2ba-493d-a861-0ccdecd882e0    0.100000        
2021-02-18T08:58:35Z
33      a68c8bc5-366c-4590-af0e-e9d7248f8b12    2.500000        
2021-02-18T08:58:57Z
34      52586a23-02a1-4cad-9599-dd9f42752e24    0.060000        
2021-02-18T08:58:54Z
35      da2fe388-390d-41d7-982b-2a15dc990f06    3.000000        
2021-02-18T08:58:49Z
36      4c7d0d79-cf48-4df7-a590-dcfacca03ee4    1.000000        
2021-02-18T09:08:45Z
37      3a1aca88-a31b-4bcc-82a0-629f58136570    0.100000        
2021-02-18T08:58:56Z
38      1d2e4d88-c3a0-423c-bd3e-74e27a43636b    6.900000        
2021-02-18T08:57:49Z
39      98c892ef-a2ba-493d-a861-0ccdecd882e0    0.100000        
2021-02-18T08:59:06Z
40      ae6d0b4c-83c9-474a-8915-94288d433167    0.200000        
2021-02-18T08:59:11Z
41      6e452179-12e3-4826-9c2c-15fdde20e5e5    0.100000        
2021-02-18T08:59:06Z
42      a68c8bc5-366c-4590-af0e-e9d7248f8b12    2.500000        
2021-02-18T08:59:26Z
43      da2fe388-390d-41d7-982b-2a15dc990f06    3.000000        
2021-02-18T08:59:20Z

The output when manually joined to the input set shows the last rows are all
not accounted for:
        sender  rt      stw     stw_r   avgstw  minrt   maxrt   diff
0       da2fe388-390d-41d7-982b-2a15dc990f06    2021-02-18T08:55:19Z    3.000000
3.000000        3.000000        2021-02-18T08:55:19Z    2021-02-18T08:55:19Z    
NaN
1       3a1aca88-a31b-4bcc-82a0-629f58136570    2021-02-18T08:55:27Z    0.200000
0.200000        0.200000        2021-02-18T08:55:27Z    2021-02-18T08:55:27Z    
0.0
2       4c7d0d79-cf48-4df7-a590-dcfacca03ee4    2021-02-18T09:05:13Z    1.000000
1.000000        1.000000        2021-02-18T09:05:13Z    2021-02-18T09:05:13Z    
9.0
3       1d2e4d88-c3a0-423c-bd3e-74e27a43636b    2021-02-18T08:54:18Z    6.900000
6.900000        6.900000        2021-02-18T08:54:18Z    2021-02-18T08:54:18Z    
-11.0
4       ae6d0b4c-83c9-474a-8915-94288d433167    2021-02-18T08:55:41Z    0.200000
0.200000        0.200000        2021-02-18T08:55:41Z    2021-02-18T08:55:41Z    
1.0
5       c3519487-8e6f-4603-b88c-17fd687baa6b    2021-02-18T08:55:48Z    0.200000
0.200000        0.200000        2021-02-18T08:55:48Z    2021-02-18T08:55:48Z    
0.0
6       c7b8692c-d233-4fa1-9432-8f477458ca24    2021-02-18T08:55:50Z    0.200000
0.200000        0.200000        2021-02-18T08:55:50Z    2021-02-18T08:55:50Z    
0.0
7       da2fe388-390d-41d7-982b-2a15dc990f06    2021-02-18T08:55:49Z    3.000000
3.000000        3.000000        2021-02-18T08:55:49Z    2021-02-18T08:55:49Z    
-1.0
8       4c7d0d79-cf48-4df7-a590-dcfacca03ee4    2021-02-18T09:05:43Z    1.000000
1.000000        1.000000        2021-02-18T09:05:13Z    2021-02-18T09:05:43Z    
9.0
9       1d2e4d88-c3a0-423c-bd3e-74e27a43636b    2021-02-18T08:54:48Z    6.900000
6.900000        6.900000        2021-02-18T08:54:48Z    2021-02-18T08:54:48Z    
-11.0
10      b760d3ec-f46c-4aad-b7fe-5934914ab948    2021-02-18T08:56:18Z    
16.790001
16.790001       16.790001       2021-02-18T08:56:18Z    2021-02-18T08:56:18Z    
1.0
11      c7b8692c-d233-4fa1-9432-8f477458ca24    2021-02-18T08:56:21Z    0.200000
0.200000        0.200000        2021-02-18T08:56:21Z    2021-02-18T08:56:21Z    
0.0
12      a68c8bc5-366c-4590-af0e-e9d7248f8b12    2021-02-18T08:56:26Z    2.500000
2.500000        2.500000        2021-02-18T08:56:26Z    2021-02-18T08:56:26Z    
0.0
13      da2fe388-390d-41d7-982b-2a15dc990f06    2021-02-18T08:56:19Z    3.000000
3.000000        3.000000        2021-02-18T08:56:19Z    2021-02-18T08:56:19Z    
-1.0
14      4c7d0d79-cf48-4df7-a590-dcfacca03ee4    2021-02-18T09:06:14Z    1.000000
1.000000        1.000000        2021-02-18T09:05:43Z    2021-02-18T09:06:14Z    
9.0
15      98c892ef-a2ba-493d-a861-0ccdecd882e0    2021-02-18T08:56:36Z    0.200000
0.200000        0.200000        2021-02-18T08:56:36Z    2021-02-18T08:56:36Z    
-10.0
16      6e452179-12e3-4826-9c2c-15fdde20e5e5    2021-02-18T08:56:36Z    0.100000
0.100000        0.100000        2021-02-18T08:56:36Z    2021-02-18T08:56:36Z    
0.0
17      c3519487-8e6f-4603-b88c-17fd687baa6b    2021-02-18T08:56:48Z    0.200000
0.200000        0.200000        2021-02-18T08:56:48Z    2021-02-18T08:56:48Z    
0.0
18      da2fe388-390d-41d7-982b-2a15dc990f06    2021-02-18T08:56:49Z    3.000000
3.000000        3.000000        2021-02-18T08:56:49Z    2021-02-18T08:56:49Z    
0.0
19      4c7d0d79-cf48-4df7-a590-dcfacca03ee4    2021-02-18T09:06:43Z    1.000000
1.000000        1.000000        2021-02-18T09:06:14Z    2021-02-18T09:06:43Z    
9.0
20      1d2e4d88-c3a0-423c-bd3e-74e27a43636b    2021-02-18T08:55:48Z    6.900000
6.900000        6.900000        2021-02-18T08:55:48Z    2021-02-18T08:55:48Z    
-11.0
21      b760d3ec-f46c-4aad-b7fe-5934914ab948    2021-02-18T08:57:19Z    
16.889999
16.889999       16.889999       2021-02-18T08:57:19Z    2021-02-18T08:57:19Z    
1.0
22      4c7d0d79-cf48-4df7-a590-dcfacca03ee4    2021-02-18T09:07:14Z    1.000000
1.000000        1.000000        2021-02-18T09:06:43Z    2021-02-18T09:07:14Z    
9.0
23      1d2e4d88-c3a0-423c-bd3e-74e27a43636b    2021-02-18T08:56:19Z    6.900000
6.900000        6.900000        2021-02-18T08:56:19Z    2021-02-18T08:56:19Z    
-11.0
24      c7b8692c-d233-4fa1-9432-8f477458ca24    2021-02-18T08:57:51Z    0.200000
0.200000        0.200000        2021-02-18T08:57:51Z    2021-02-18T08:57:51Z    
1.0
25      da2fe388-390d-41d7-982b-2a15dc990f06    2021-02-18T08:57:50Z    3.000000
3.000000        3.000000        2021-02-18T08:57:50Z    2021-02-18T08:57:50Z    
-1.0
26      1d2e4d88-c3a0-423c-bd3e-74e27a43636b    2021-02-18T08:56:49Z    6.900000
6.900000        6.900000        2021-02-18T08:56:19Z    2021-02-18T08:56:49Z    
-2.0
27      6e452179-12e3-4826-9c2c-15fdde20e5e5    2021-02-18T08:58:06Z    0.200000
0.200000        0.200000        2021-02-18T08:58:06Z    2021-02-18T08:58:06Z    
1.0
28      c3519487-8e6f-4603-b88c-17fd687baa6b    2021-02-18T08:58:18Z    0.200000
0.200000        0.200000        2021-02-18T08:58:18Z    2021-02-18T08:58:18Z    
0.0
29      a68c8bc5-366c-4590-af0e-e9d7248f8b12    2021-02-18T08:58:27Z    2.500000
2.500000        2.500000        2021-02-18T08:58:27Z    2021-02-18T08:58:27Z    
0.0
30      3a1aca88-a31b-4bcc-82a0-629f58136570    2021-02-18T08:58:26Z    0.200000
0.200000        0.200000        2021-02-18T08:58:26Z    2021-02-18T08:58:26Z    
-1.0
31      ce37f455-6db4-4103-a497-426ed5be14b4    2021-02-18T08:58:34Z    0.200000
0.200000        0.200000        2021-02-18T08:58:34Z    2021-02-18T08:58:34Z    
0.0
32      98c892ef-a2ba-493d-a861-0ccdecd882e0    2021-02-18T08:58:35Z    0.100000
0.100000        0.100000        2021-02-18T08:58:35Z    2021-02-18T08:58:35Z    
0.0
33      a68c8bc5-366c-4590-af0e-e9d7248f8b12    2021-02-18T08:58:57Z    2.500000
2.500000        2.500000        2021-02-18T08:58:27Z    2021-02-18T08:58:57Z    
0.0
34      52586a23-02a1-4cad-9599-dd9f42752e24    2021-02-18T08:58:54Z    0.060000
0.060000        0.060000        2021-02-18T08:58:54Z    2021-02-18T08:58:54Z    
-1.0
35      da2fe388-390d-41d7-982b-2a15dc990f06    2021-02-18T08:58:49Z    3.000000
3.000000        3.000000        2021-02-18T08:58:49Z    2021-02-18T08:58:49Z    
-1.0
36      4c7d0d79-cf48-4df7-a590-dcfacca03ee4    2021-02-18T09:08:45Z    
1.000000        NaN
NaN     NaN     NaN     9.0
37      3a1aca88-a31b-4bcc-82a0-629f58136570    2021-02-18T08:58:56Z    
0.100000        NaN
NaN     NaN     NaN     -10.0
38      1d2e4d88-c3a0-423c-bd3e-74e27a43636b    2021-02-18T08:57:49Z    
6.900000        NaN
NaN     NaN     NaN     -2.0
39      98c892ef-a2ba-493d-a861-0ccdecd882e0    2021-02-18T08:59:06Z    
0.100000        NaN
NaN     NaN     NaN     1.0
40      ae6d0b4c-83c9-474a-8915-94288d433167    2021-02-18T08:59:11Z    
0.200000        NaN
NaN     NaN     NaN     0.0
41      6e452179-12e3-4826-9c2c-15fdde20e5e5    2021-02-18T08:59:06Z    
0.100000        NaN
NaN     NaN     NaN     -1.0
42      a68c8bc5-366c-4590-af0e-e9d7248f8b12    2021-02-18T08:59:26Z    
2.500000        NaN
NaN     NaN     NaN     0.0
43      da2fe388-390d-41d7-982b-2a15dc990f06    2021-02-18T08:59:20Z    
3.000000        NaN
NaN     NaN     NaN     -1.0

Can anybody enlighten me?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to