Hello, Dian!

Thank you very much for your explanations. In my case, CEP  patterns are
based on the event time. Also, as you said I have many devices and also
many ports on that devices. Therefore, I'm using keyed streams.
So I would like to know which device was disconnected on which port.
Is that feasible Flink? If yes in what version and how?

Regards,
 Florin


On Thu, Dec 13, 2018 at 4:37 AM fudian.fd <fudian...@alibaba-inc.com> wrote:

> Hi Florin,
>
> Are you using processing time or event time? The JIRA FLINK-7384 allows to
> emit timed-out patterns without having to wait for the next element ONLY in
> processing time. For event time, it still needs the watermark to trigger
> the emitting of matched or timed-out patterns. Besides, the watermark is
> global for the whole job, not per key. In your application, SNMP will not
> send anymore data for one device when it's disconnected. But if there are
> many devices to be monitored, the watermark will still progress as long as
> there are some devices connected. If it's the case that there are no
> elements at all, the mentioned assigner will look like the following:
>
> class PeriodicExtractor implements AssignerWithPeriodicWatermarks<Long> {
>
>    private volatile long maxTimestamp = Long.MIN_VALUE;
>
>    private static final long MAX_IDLE = 5 * 60 * 1000; // 5 minutes
>    private static final long OFFSET = 30 * 1000;    // 30 seconds
>    private volatile long lastTimeReceivedElement = 0L;
>
>    @Override
>    public long extractTimestamp(Long element, long previousElementTimestamp) {
>       maxTimestamp = Math.max(maxTimestamp, element);
>       lastTimeReceivedElement = System.currentTimeMillis();
>       return element;
>    }
>
>    @Nullable
>    @Override
>    public Watermark getCurrentWatermark() {
>       if (System.currentTimeMillis() - lastTimeReceivedElement >= MAX_IDLE) {
>          maxTimestamp = Math.max(maxTimestamp, System.currentTimeMillis() - 
> OFFSET);
>       }
>       return new Watermark(maxTimestamp);
>    }
> }
>
>
> Regards,
> Dian
>
> 在 2018年12月13日,上午12:06,Andrey Zagrebin <and...@data-artisans.com> 写道:
>
> Hi Florin,
>
> I think Dawid might help you. I am pulling him into the discussion.
>
> Best,
> Andrey
>
> On 12 Dec 2018, at 16:24, Spico Florin <spicoflo...@gmail.com> wrote:
>
> Hello!
> I'm using the same questions as in this stackoverflow post
> https://stackoverflow.com/questions/50356981/apache-flink-cep-how-to-detect-if-event-did-not-occur-within-x-seconds?rq=1,
> due to fact that I need the same functionality.
>
> My use case is to detect when an device is disconnected from a server (for
> example).
> From SNMP I'm sending the data to Kafka then  to Flink CEP and check
> whether I have the established connection within X seconds.
> If the window of X seconds timedout, then I can consider that the device
> is disconnected.
> The issue is that, when the device is disconnected, SNMP does't send
> anymore data about that port that was used. Thus no event is sent to CEP.
> Only when the device is connected again then, the disconnected event is
> triggered.
>
> The same functionality was requested in this post too:
> https://stackoverflow.com/questions/51399973/flink-time-window-based-on-event-time-output-nothing
> .
>
> I checked the status of the jira ticket regarding this feature:
>
> https://issues.apache.org/jira/browse/FLINK-7384
>
> but unfortunately, is still open.
>
> In the official documentation,
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html,
> in the paragraph related with Idling sources, somehow the situation
> described resembles what we need:
> "Currently, with pure event time watermarks generators, watermarks can not
> progress if there are no elements to be processed. That means in case of
> gap in the
> incoming data, event time will not progress and for example the window
> operator will not be triggered and thus existing windows will not be able
> to produce any output data.
> To circumvent this one can use periodic watermark assigners that don’t
> only assign based on element timestamps. An example solution could be
> an assigner that switches to using current processing time as the time
> basis after not observing new events for a while."
>
> Unfortunately, no example of how the mentioned assigner looks like. A full
> example will help us to check whether the mentioned solution works or not.
>
> Therefore, can you please share your thoughts about this feature?
> Is feasible? If yes,please provide the Flink version that solve it and the
> example.
> Is still a limitation and there is no plan to solve it in near future?
>
> I look forward for your answers.
>
> Best regards,
> Florin
>
>
>
>
>

Reply via email to