Hi Florin,

I concur with Dian. If you have any other questions, please do not
hesitate to ask.

Best,

Dawid

On 13/12/2018 03:37, fudian.fd wrote:
> Hi Florin,
>
> Are you using processing time or event time? The JIRA FLINK-7384
> allows to emit timed-out patterns without having to wait for the next
> element ONLY in processing time. For event time, it still needs the
> watermark to trigger the emitting of matched or timed-out patterns.
> Besides, the watermark is global for the whole job, not per key. In
> your application, SNMP will not send anymore data for one device when
> it's disconnected. But if there are many devices to be monitored, the
> watermark will still progress as long as there are some devices
> connected. If it's the case that there are no elements at all, the
> mentioned assigner will look like the following:
>
> class PeriodicExtractor implements AssignerWithPeriodicWatermarks<Long> {
>
>    private volatile long maxTimestamp = Long.MIN_VALUE;
>
>    private static final long MAX_IDLE = 5 * 60 * 1000; // 5 minutes private 
> static final long OFFSET = 30 * 1000;    // 30 seconds private volatile long 
> lastTimeReceivedElement = 0L;
>
>    @Override public long extractTimestamp(Long element, long 
> previousElementTimestamp) {
>       maxTimestamp = Math.max(maxTimestamp, element);
>       lastTimeReceivedElement = System.currentTimeMillis();
>       return element;
>    }
>
>    @Nullable @Override public Watermark getCurrentWatermark() {
>       if (System.currentTimeMillis() - lastTimeReceivedElement >= MAX_IDLE) {
>          maxTimestamp = Math.max(maxTimestamp, System.currentTimeMillis() - 
> OFFSET);
>       }
>       return new Watermark(maxTimestamp);
>    }
> }
>
> Regards,
> Dian
>
>> 在 2018年12月13日,上午12:06,Andrey Zagrebin <and...@data-artisans.com
>> <mailto:and...@data-artisans.com>> 写道:
>>
>> Hi Florin,
>>
>> I think Dawid might help you. I am pulling him into the discussion.
>>
>> Best,
>> Andrey
>>
>>> On 12 Dec 2018, at 16:24, Spico Florin <spicoflo...@gmail.com
>>> <mailto:spicoflo...@gmail.com>> wrote:
>>>
>>> Hello!
>>> I'm using the same questions as in this stackoverflow
>>> post 
>>> https://stackoverflow.com/questions/50356981/apache-flink-cep-how-to-detect-if-event-did-not-occur-within-x-seconds?rq=1,
>>> due to fact that I need the same functionality.
>>>
>>> My use case is to detect when an device is disconnected from a
>>> server (for example).
>>> From SNMP I'm sending the data to Kafka then  to Flink CEP and check
>>> whether I have the established connection within X seconds.
>>> If the window of X seconds timedout, then I can consider that the
>>> device is disconnected.
>>> The issue is that, when the device is disconnected, SNMP does't send
>>> anymore data about that port that was used. Thus no event is sent to
>>> CEP. Only when the device is connected again then, the disconnected
>>> event is triggered.
>>>
>>> The same functionality was requested in this post
>>> too: 
>>> https://stackoverflow.com/questions/51399973/flink-time-window-based-on-event-time-output-nothing.
>>>
>>> I checked the status of the jira ticket regarding this feature:
>>>
>>> https://issues.apache.org/jira/browse/FLINK-7384
>>>
>>> but unfortunately, is still open.
>>>
>>> In the official
>>> documentation, 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html,
>>> in the paragraph related with Idling sources, somehow the situation
>>> described resembles what we need:
>>> "Currently, with pure event time watermarks generators, watermarks
>>> can not progress if there are no elements to be processed. That
>>> means in case of gap in the 
>>> incoming data, event time will not progress and for example the
>>> window operator will not be triggered and thus existing windows will
>>> not be able to produce any output data.
>>> To circumvent this one can use periodic watermark assigners that
>>> don’t only assign based on element timestamps. An example solution
>>> could be 
>>> an assigner that switches to using current processing time as the
>>> time basis after not observing new events for a while."
>>>
>>> Unfortunately, no example of how the mentioned assigner looks like.
>>> A full example will help us to check whether the mentioned solution
>>> works or not.
>>>
>>> Therefore, can you please share your thoughts about this feature? 
>>> Is feasible? If yes,please provide the Flink version that solve it
>>> and the example.
>>> Is still a limitation and there is no plan to solve it in near future?
>>>
>>> I look forward for your answers.
>>>
>>> Best regards,
>>> Florin
>>>
>>>
>>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to