Apparently, I haven't made that clear enough in my first mail, so thanks
for clarifying that Theo.

As Matthias wrote, the general solution is to use (Keyed)ProcessFunction
[1]. However, if OP uses watermarks, chances are high that OP uses them for
windows, so I wanted to point out the intended way.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
.

On Fri, Sep 25, 2020 at 9:27 AM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Arvid,
>
> be aware that allowedLateness will only be applied when your job has some
> windowing in use. If you have late events and you only apply mapFunctions
> like enrichment, as far as I know, the event's won't be filtered out
> automatically .
>
> Best regards
> Theo
>
> ------------------------------
> *Von: *"Arvid Heise" <ar...@ververica.com>
> *An: *"Matthias Pohl" <matth...@ververica.com>
> *CC: *"Ori Popowski" <ori....@gmail.com>, "user" <user@flink.apache.org>
> *Gesendet: *Freitag, 25. September 2020 07:58:40
> *Betreff: *Re: How can I drop events which are late by more than X
> hours/days?
>
> Hi Ori,
>
> if you use windows, Flink has already a solution on board with allowed
> lateness [1].
>
> By default, Flink filters all late records (records timestamp < current
> watermark). You can add the time X and still allow these elements to be
> processed.
>
> If you end up treating all late events as normal events and you don't fire
> when the watermark comes (e.g., you want to avoid late firings), you
> probably just want to adjust your watermark strategy to reflect X and keep
> allowed lateness to 0. That is, if you current watermark strategy is 
> watermark=max(record
> timestamp), you can just go with watermark=max(record timestamp)-X.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#allowed-lateness
>
> On Thu, Sep 24, 2020 at 12:14 PM Matthias Pohl <matth...@ververica.com>
> wrote:
>
>> Hi Ori,
>> one way to do it is to implement a basic ProcessFunction. 
>> ProcessFunction.processElement(I
>> value, Context ctx, Collector<O> out) offers access to the context
>> through which you can access the current watermark timestamp using
>> ctx.timerService().currentWatermark(). That you can use to filter out
>> delayed events.
>>
>> Best,
>> Matthias
>>
>> On Thu, Sep 24, 2020 at 9:59 AM Ori Popowski <ori....@gmail.com> wrote:
>>
>>> I need to drop elements which are delayed by more than a certain amount
>>> of time from the current watermark.
>>>
>>> I wanted to create a FilterFunction where I get the current watermark,
>>> and if the difference between the watermark and my element's timestamp is
>>> greater than X - drop the element.
>>>
>>> However, I do not have access to the current watermark inside any of
>>> Flink's operators/functions including FilterFunction.
>>>
>>> How can such functionality be achieved?
>>>
>>
>>
>> --
>>
>> Matthias Pohl | Engineer
>>
>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
>> Wehner
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to