Hi Ori,

if you use windows, Flink has already a solution on board with allowed
lateness [1].

By default, Flink filters all late records (records timestamp < current
watermark). You can add the time X and still allow these elements to be
processed.

If you end up treating all late events as normal events and you don't fire
when the watermark comes (e.g., you want to avoid late firings), you
probably just want to adjust your watermark strategy to reflect X and keep
allowed lateness to 0. That is, if you current watermark strategy is
watermark=max(record
timestamp), you can just go with watermark=max(record timestamp)-X.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#allowed-lateness

On Thu, Sep 24, 2020 at 12:14 PM Matthias Pohl <matth...@ververica.com>
wrote:

> Hi Ori,
> one way to do it is to implement a basic ProcessFunction. 
> ProcessFunction.processElement(I
> value, Context ctx, Collector<O> out) offers access to the context
> through which you can access the current watermark timestamp using
> ctx.timerService().currentWatermark(). That you can use to filter out
> delayed events.
>
> Best,
> Matthias
>
> On Thu, Sep 24, 2020 at 9:59 AM Ori Popowski <ori....@gmail.com> wrote:
>
>> I need to drop elements which are delayed by more than a certain amount
>> of time from the current watermark.
>>
>> I wanted to create a FilterFunction where I get the current watermark,
>> and if the difference between the watermark and my element's timestamp is
>> greater than X - drop the element.
>>
>> However, I do not have access to the current watermark inside any of
>> Flink's operators/functions including FilterFunction.
>>
>> How can such functionality be achieved?
>>
>
>
> --
>
> Matthias Pohl | Engineer
>
> Follow us @VervericaData Ververica <https://www.ververica.com/>
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to