FYI, I've responded to this on stack overflow: https://stackoverflow.com/questions/68715430/apache-flink-accessing-keyed-state-from-late-window
On Mon, Aug 9, 2021 at 3:16 AM suman shil <cncf.s...@gmail.com> wrote: > I am writing a Flink application which consumes time series data from > kafka topic. Time series data has components like metric name, tag key > value pair, timestamp and a value. I have created a tumbling window to > aggregate data based on a metric key (which is a combination of metric > name, key value pair and timestamp). Here is the main stream looks like > > kafka source -> Flat Map which parses and emits Metric -> Key by metric > key -> Tumbling window of 60 seconds -> Aggregate the data -> write to the > sync. > > I also want to check if there is any metric which arrived late outside the > above window. I want to check how many metrics arrived late and calculate > the percentage of late metrics compared to original metrics. I am thinking > of using the "allowedLateness" feature of flink to send the late metrics to > a different stream. I am planning to add a "MapState" in the main > "Aggregate the data" operator which will have the key as the metric key and > value as the count of the metrics that arrived in the main window. > > > kafka source -> Flat Map which parses and emits Metric -> Key by metric > key -> Tumbling window of 60 seconds -> Aggregate the data (Maintain a map > state of metric count) -> write to the sync. > > \ > > \ > > Late data -> Key by > metric key -> Collect late metrics and find the percentage of late metrics > -> Write the result in sink > > My question is can "Collect late metrics and find the percentage of late > metrics" operator access the "MapState" which got updated by the > mainstream. Even though they are keyed by the same metric key, I guess they > are two different tasks. I want to calculate (number of late metrics / > (number of late metrics + number of metrics arrived on time)). > > Thanks > Suman >