I need to drop elements which are delayed by more than a certain amount of time from the current watermark.
I wanted to create a FilterFunction where I get the current watermark, and if the difference between the watermark and my element's timestamp is greater than X - drop the element. However, I do not have access to the current watermark inside any of Flink's operators/functions including FilterFunction. How can such functionality be achieved?