I'm trying to calculate a simple rolling average using pyflink, but somehow
the last rows streaming in seem to be excluded, which i expected to be the
result of data arriving out of order. However i fail to understand why.
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env =
StreamTableEnvironment.create(exec_env,environment_settings=env_settings)
table_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///Users/jag002/flinkjars/flink-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/kafka-clients-2.7.0.jar")
source_ts_ddl = f"""
CREATE TABLE kafka_ts_source (
sender VARCHAR,
length DOUBLE,
rot DOUBLE,
sog DOUBLE,
stw DOUBLE,
x_time STRING,
*rt as TO_TIMESTAMP(x_time),
WATERMARK FOR rt as rt - INTERVAL '1' second*
) with (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'dms_ts',
'connector.properties.bootstrap.servers' = '{kafka_servers}',
'connector.properties.zookeeper.connect' =
'{kafka_zookeeper_servers}',
'connector.properties.group.id' = '{kafka_consumer_group_id}',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
table_env.sql_update(source_ts_ddl)
calculate_table = table_env.sql_query("""SELECT
sender,
stw,
rt
FROM kafka_ts_source
""")
temp1_ddl = f"""
CREATE TABLE kafka_temp1_sink (
sender VARCHAR,
stw DOUBLE,
rt TIMESTAMP(3)
) with (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'temp_sink1',
'connector.properties.bootstrap.servers' =
'{kafka_servers}',
'connector.properties.zookeeper.connect' =
'{kafka_zookeeper_servers}',
'connector.properties.group.id' =
'{kafka_consumer_group_id}',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
table_env.sql_update(temp1_ddl)
calculate_table.execute_insert("kafka_temp1_sink")
*averages_table = calculate_table.select("sender,
stw").over_window(Over.partition_by("sender").order_by("rt") \
.preceding(expr.lit(50).seconds) \
.following("CURRENT_RANGE").alias("w")) \
.select(calculate_table.sender.alias('sender'),
calculate_table.rt.alias('rt'), calculate_table.stw.alias('stw'),
calculate_table.stw.avg.over(expr.col('w')).alias('avgstw'),
calculate_table.rt.min.over(expr.col('w')).cast(DataTypes.TIMESTAMP(3)).alias('minrt'),
calculate_table.rt.max.over(expr.col('w')).cast(DataTypes.TIMESTAMP(3)).alias('maxrt'))*
temp2_ddl = f"""
CREATE TABLE kafka_temp2_sink (
sender VARCHAR,
rt TIMESTAMP(3),
stw DOUBLE,
avgstw DOUBLE,
minrt TIMESTAMP(3),
maxrt TIMESTAMP(3)
) with (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'temp_sink2',
'connector.properties.bootstrap.servers' =
'{kafka_servers}',
'connector.properties.zookeeper.connect' =
'{kafka_zookeeper_servers}',
'connector.properties.group.id' =
'{kafka_consumer_group_id}',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
table_env.sql_update(temp2_ddl)
averages_table.execute_insert("kafka_temp2_sink").wait()
table_env.execute("ark_demo")
my input is 44 events:
index sender stw rt
0 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000
2021-02-18T08:55:19Z
1 3a1aca88-a31b-4bcc-82a0-629f58136570 0.200000
2021-02-18T08:55:27Z
2 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 1.000000
2021-02-18T09:05:13Z
3 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 6.900000
2021-02-18T08:54:18Z
4 ae6d0b4c-83c9-474a-8915-94288d433167 0.200000
2021-02-18T08:55:41Z
5 c3519487-8e6f-4603-b88c-17fd687baa6b 0.200000
2021-02-18T08:55:48Z
6 c7b8692c-d233-4fa1-9432-8f477458ca24 0.200000
2021-02-18T08:55:50Z
7 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000
2021-02-18T08:55:49Z
8 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 1.000000
2021-02-18T09:05:43Z
9 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 6.900000
2021-02-18T08:54:48Z
10 b760d3ec-f46c-4aad-b7fe-5934914ab948 16.790001
2021-02-18T08:56:18Z
11 c7b8692c-d233-4fa1-9432-8f477458ca24 0.200000
2021-02-18T08:56:21Z
12 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2.500000
2021-02-18T08:56:26Z
13 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000
2021-02-18T08:56:19Z
14 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 1.000000
2021-02-18T09:06:14Z
15 98c892ef-a2ba-493d-a861-0ccdecd882e0 0.200000
2021-02-18T08:56:36Z
16 6e452179-12e3-4826-9c2c-15fdde20e5e5 0.100000
2021-02-18T08:56:36Z
17 c3519487-8e6f-4603-b88c-17fd687baa6b 0.200000
2021-02-18T08:56:48Z
18 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000
2021-02-18T08:56:49Z
19 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 1.000000
2021-02-18T09:06:43Z
20 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 6.900000
2021-02-18T08:55:48Z
21 b760d3ec-f46c-4aad-b7fe-5934914ab948 16.889999
2021-02-18T08:57:19Z
22 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 1.000000
2021-02-18T09:07:14Z
23 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 6.900000
2021-02-18T08:56:19Z
24 c7b8692c-d233-4fa1-9432-8f477458ca24 0.200000
2021-02-18T08:57:51Z
25 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000
2021-02-18T08:57:50Z
26 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 6.900000
2021-02-18T08:56:49Z
27 6e452179-12e3-4826-9c2c-15fdde20e5e5 0.200000
2021-02-18T08:58:06Z
28 c3519487-8e6f-4603-b88c-17fd687baa6b 0.200000
2021-02-18T08:58:18Z
29 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2.500000
2021-02-18T08:58:27Z
30 3a1aca88-a31b-4bcc-82a0-629f58136570 0.200000
2021-02-18T08:58:26Z
31 ce37f455-6db4-4103-a497-426ed5be14b4 0.200000
2021-02-18T08:58:34Z
32 98c892ef-a2ba-493d-a861-0ccdecd882e0 0.100000
2021-02-18T08:58:35Z
33 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2.500000
2021-02-18T08:58:57Z
34 52586a23-02a1-4cad-9599-dd9f42752e24 0.060000
2021-02-18T08:58:54Z
35 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000
2021-02-18T08:58:49Z
36 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 1.000000
2021-02-18T09:08:45Z
37 3a1aca88-a31b-4bcc-82a0-629f58136570 0.100000
2021-02-18T08:58:56Z
38 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 6.900000
2021-02-18T08:57:49Z
39 98c892ef-a2ba-493d-a861-0ccdecd882e0 0.100000
2021-02-18T08:59:06Z
40 ae6d0b4c-83c9-474a-8915-94288d433167 0.200000
2021-02-18T08:59:11Z
41 6e452179-12e3-4826-9c2c-15fdde20e5e5 0.100000
2021-02-18T08:59:06Z
42 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2.500000
2021-02-18T08:59:26Z
43 da2fe388-390d-41d7-982b-2a15dc990f06 3.000000
2021-02-18T08:59:20Z
The output when manually joined to the input set shows the last rows are all
not accounted for:
sender rt stw stw_r avgstw minrt maxrt diff
0 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:55:19Z 3.000000
3.000000 3.000000 2021-02-18T08:55:19Z 2021-02-18T08:55:19Z
NaN
1 3a1aca88-a31b-4bcc-82a0-629f58136570 2021-02-18T08:55:27Z 0.200000
0.200000 0.200000 2021-02-18T08:55:27Z 2021-02-18T08:55:27Z
0.0
2 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 2021-02-18T09:05:13Z 1.000000
1.000000 1.000000 2021-02-18T09:05:13Z 2021-02-18T09:05:13Z
9.0
3 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 2021-02-18T08:54:18Z 6.900000
6.900000 6.900000 2021-02-18T08:54:18Z 2021-02-18T08:54:18Z
-11.0
4 ae6d0b4c-83c9-474a-8915-94288d433167 2021-02-18T08:55:41Z 0.200000
0.200000 0.200000 2021-02-18T08:55:41Z 2021-02-18T08:55:41Z
1.0
5 c3519487-8e6f-4603-b88c-17fd687baa6b 2021-02-18T08:55:48Z 0.200000
0.200000 0.200000 2021-02-18T08:55:48Z 2021-02-18T08:55:48Z
0.0
6 c7b8692c-d233-4fa1-9432-8f477458ca24 2021-02-18T08:55:50Z 0.200000
0.200000 0.200000 2021-02-18T08:55:50Z 2021-02-18T08:55:50Z
0.0
7 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:55:49Z 3.000000
3.000000 3.000000 2021-02-18T08:55:49Z 2021-02-18T08:55:49Z
-1.0
8 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 2021-02-18T09:05:43Z 1.000000
1.000000 1.000000 2021-02-18T09:05:13Z 2021-02-18T09:05:43Z
9.0
9 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 2021-02-18T08:54:48Z 6.900000
6.900000 6.900000 2021-02-18T08:54:48Z 2021-02-18T08:54:48Z
-11.0
10 b760d3ec-f46c-4aad-b7fe-5934914ab948 2021-02-18T08:56:18Z
16.790001
16.790001 16.790001 2021-02-18T08:56:18Z 2021-02-18T08:56:18Z
1.0
11 c7b8692c-d233-4fa1-9432-8f477458ca24 2021-02-18T08:56:21Z 0.200000
0.200000 0.200000 2021-02-18T08:56:21Z 2021-02-18T08:56:21Z
0.0
12 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2021-02-18T08:56:26Z 2.500000
2.500000 2.500000 2021-02-18T08:56:26Z 2021-02-18T08:56:26Z
0.0
13 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:56:19Z 3.000000
3.000000 3.000000 2021-02-18T08:56:19Z 2021-02-18T08:56:19Z
-1.0
14 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 2021-02-18T09:06:14Z 1.000000
1.000000 1.000000 2021-02-18T09:05:43Z 2021-02-18T09:06:14Z
9.0
15 98c892ef-a2ba-493d-a861-0ccdecd882e0 2021-02-18T08:56:36Z 0.200000
0.200000 0.200000 2021-02-18T08:56:36Z 2021-02-18T08:56:36Z
-10.0
16 6e452179-12e3-4826-9c2c-15fdde20e5e5 2021-02-18T08:56:36Z 0.100000
0.100000 0.100000 2021-02-18T08:56:36Z 2021-02-18T08:56:36Z
0.0
17 c3519487-8e6f-4603-b88c-17fd687baa6b 2021-02-18T08:56:48Z 0.200000
0.200000 0.200000 2021-02-18T08:56:48Z 2021-02-18T08:56:48Z
0.0
18 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:56:49Z 3.000000
3.000000 3.000000 2021-02-18T08:56:49Z 2021-02-18T08:56:49Z
0.0
19 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 2021-02-18T09:06:43Z 1.000000
1.000000 1.000000 2021-02-18T09:06:14Z 2021-02-18T09:06:43Z
9.0
20 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 2021-02-18T08:55:48Z 6.900000
6.900000 6.900000 2021-02-18T08:55:48Z 2021-02-18T08:55:48Z
-11.0
21 b760d3ec-f46c-4aad-b7fe-5934914ab948 2021-02-18T08:57:19Z
16.889999
16.889999 16.889999 2021-02-18T08:57:19Z 2021-02-18T08:57:19Z
1.0
22 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 2021-02-18T09:07:14Z 1.000000
1.000000 1.000000 2021-02-18T09:06:43Z 2021-02-18T09:07:14Z
9.0
23 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 2021-02-18T08:56:19Z 6.900000
6.900000 6.900000 2021-02-18T08:56:19Z 2021-02-18T08:56:19Z
-11.0
24 c7b8692c-d233-4fa1-9432-8f477458ca24 2021-02-18T08:57:51Z 0.200000
0.200000 0.200000 2021-02-18T08:57:51Z 2021-02-18T08:57:51Z
1.0
25 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:57:50Z 3.000000
3.000000 3.000000 2021-02-18T08:57:50Z 2021-02-18T08:57:50Z
-1.0
26 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 2021-02-18T08:56:49Z 6.900000
6.900000 6.900000 2021-02-18T08:56:19Z 2021-02-18T08:56:49Z
-2.0
27 6e452179-12e3-4826-9c2c-15fdde20e5e5 2021-02-18T08:58:06Z 0.200000
0.200000 0.200000 2021-02-18T08:58:06Z 2021-02-18T08:58:06Z
1.0
28 c3519487-8e6f-4603-b88c-17fd687baa6b 2021-02-18T08:58:18Z 0.200000
0.200000 0.200000 2021-02-18T08:58:18Z 2021-02-18T08:58:18Z
0.0
29 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2021-02-18T08:58:27Z 2.500000
2.500000 2.500000 2021-02-18T08:58:27Z 2021-02-18T08:58:27Z
0.0
30 3a1aca88-a31b-4bcc-82a0-629f58136570 2021-02-18T08:58:26Z 0.200000
0.200000 0.200000 2021-02-18T08:58:26Z 2021-02-18T08:58:26Z
-1.0
31 ce37f455-6db4-4103-a497-426ed5be14b4 2021-02-18T08:58:34Z 0.200000
0.200000 0.200000 2021-02-18T08:58:34Z 2021-02-18T08:58:34Z
0.0
32 98c892ef-a2ba-493d-a861-0ccdecd882e0 2021-02-18T08:58:35Z 0.100000
0.100000 0.100000 2021-02-18T08:58:35Z 2021-02-18T08:58:35Z
0.0
33 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2021-02-18T08:58:57Z 2.500000
2.500000 2.500000 2021-02-18T08:58:27Z 2021-02-18T08:58:57Z
0.0
34 52586a23-02a1-4cad-9599-dd9f42752e24 2021-02-18T08:58:54Z 0.060000
0.060000 0.060000 2021-02-18T08:58:54Z 2021-02-18T08:58:54Z
-1.0
35 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:58:49Z 3.000000
3.000000 3.000000 2021-02-18T08:58:49Z 2021-02-18T08:58:49Z
-1.0
36 4c7d0d79-cf48-4df7-a590-dcfacca03ee4 2021-02-18T09:08:45Z
1.000000 NaN
NaN NaN NaN 9.0
37 3a1aca88-a31b-4bcc-82a0-629f58136570 2021-02-18T08:58:56Z
0.100000 NaN
NaN NaN NaN -10.0
38 1d2e4d88-c3a0-423c-bd3e-74e27a43636b 2021-02-18T08:57:49Z
6.900000 NaN
NaN NaN NaN -2.0
39 98c892ef-a2ba-493d-a861-0ccdecd882e0 2021-02-18T08:59:06Z
0.100000 NaN
NaN NaN NaN 1.0
40 ae6d0b4c-83c9-474a-8915-94288d433167 2021-02-18T08:59:11Z
0.200000 NaN
NaN NaN NaN 0.0
41 6e452179-12e3-4826-9c2c-15fdde20e5e5 2021-02-18T08:59:06Z
0.100000 NaN
NaN NaN NaN -1.0
42 a68c8bc5-366c-4590-af0e-e9d7248f8b12 2021-02-18T08:59:26Z
2.500000 NaN
NaN NaN NaN 0.0
43 da2fe388-390d-41d7-982b-2a15dc990f06 2021-02-18T08:59:20Z
3.000000 NaN
NaN NaN NaN -1.0
Can anybody enlighten me?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/