Apparently, I haven't made that clear enough in my first mail, so thanks for clarifying that Theo.
As Matthias wrote, the general solution is to use (Keyed)ProcessFunction [1]. However, if OP uses watermarks, chances are high that OP uses them for windows, so I wanted to point out the intended way. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html . On Fri, Sep 25, 2020 at 9:27 AM Theo Diefenthal < theo.diefent...@scoop-software.de> wrote: > Hi Arvid, > > be aware that allowedLateness will only be applied when your job has some > windowing in use. If you have late events and you only apply mapFunctions > like enrichment, as far as I know, the event's won't be filtered out > automatically . > > Best regards > Theo > > ------------------------------ > *Von: *"Arvid Heise" <ar...@ververica.com> > *An: *"Matthias Pohl" <matth...@ververica.com> > *CC: *"Ori Popowski" <ori....@gmail.com>, "user" <user@flink.apache.org> > *Gesendet: *Freitag, 25. September 2020 07:58:40 > *Betreff: *Re: How can I drop events which are late by more than X > hours/days? > > Hi Ori, > > if you use windows, Flink has already a solution on board with allowed > lateness [1]. > > By default, Flink filters all late records (records timestamp < current > watermark). You can add the time X and still allow these elements to be > processed. > > If you end up treating all late events as normal events and you don't fire > when the watermark comes (e.g., you want to avoid late firings), you > probably just want to adjust your watermark strategy to reflect X and keep > allowed lateness to 0. That is, if you current watermark strategy is > watermark=max(record > timestamp), you can just go with watermark=max(record timestamp)-X. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#allowed-lateness > > On Thu, Sep 24, 2020 at 12:14 PM Matthias Pohl <matth...@ververica.com> > wrote: > >> Hi Ori, >> one way to do it is to implement a basic ProcessFunction. >> ProcessFunction.processElement(I >> value, Context ctx, Collector<O> out) offers access to the context >> through which you can access the current watermark timestamp using >> ctx.timerService().currentWatermark(). That you can use to filter out >> delayed events. >> >> Best, >> Matthias >> >> On Thu, Sep 24, 2020 at 9:59 AM Ori Popowski <ori....@gmail.com> wrote: >> >>> I need to drop elements which are delayed by more than a certain amount >>> of time from the current watermark. >>> >>> I wanted to create a FilterFunction where I get the current watermark, >>> and if the difference between the watermark and my element's timestamp is >>> greater than X - drop the element. >>> >>> However, I do not have access to the current watermark inside any of >>> Flink's operators/functions including FilterFunction. >>> >>> How can such functionality be achieved? >>> >> >> >> -- >> >> Matthias Pohl | Engineer >> >> Follow us @VervericaData Ververica <https://www.ververica.com/> >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton >> Wehner >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng