I am writing a Flink application which consumes time series data from kafka topic. Time series data has components like metric name, tag key value pair, timestamp and a value. I have created a tumbling window to aggregate data based on a metric key (which is a combination of metric name, key value pair and timestamp). Here is the main stream looks like
kafka source -> Flat Map which parses and emits Metric -> Key by metric key -> Tumbling window of 60 seconds -> Aggregate the data -> write to the sync. I also want to check if there is any metric which arrived late outside the above window. I want to check how many metrics arrived late and calculate the percentage of late metrics compared to original metrics. I am thinking of using the "allowedLateness" feature of flink to send the late metrics to a different stream. I am planning to add a "MapState" in the main "Aggregate the data" operator which will have the key as the metric key and value as the count of the metrics that arrived in the main window. kafka source -> Flat Map which parses and emits Metric -> Key by metric key -> Tumbling window of 60 seconds -> Aggregate the data (Maintain a map state of metric count) -> write to the sync. \ \ Late data -> Key by metric key -> Collect late metrics and find the percentage of late metrics -> Write the result in sink My question is can "Collect late metrics and find the percentage of late metrics" operator access the "MapState" which got updated by the mainstream. Even though they are keyed by the same metric key, I guess they are two different tasks. I want to calculate (number of late metrics / (number of late metrics + number of metrics arrived on time)). Thanks Suman