Hi Florin, I concur with Dian. If you have any other questions, please do not hesitate to ask.
Best, Dawid On 13/12/2018 03:37, fudian.fd wrote: > Hi Florin, > > Are you using processing time or event time? The JIRA FLINK-7384 > allows to emit timed-out patterns without having to wait for the next > element ONLY in processing time. For event time, it still needs the > watermark to trigger the emitting of matched or timed-out patterns. > Besides, the watermark is global for the whole job, not per key. In > your application, SNMP will not send anymore data for one device when > it's disconnected. But if there are many devices to be monitored, the > watermark will still progress as long as there are some devices > connected. If it's the case that there are no elements at all, the > mentioned assigner will look like the following: > > class PeriodicExtractor implements AssignerWithPeriodicWatermarks<Long> { > > private volatile long maxTimestamp = Long.MIN_VALUE; > > private static final long MAX_IDLE = 5 * 60 * 1000; // 5 minutes private > static final long OFFSET = 30 * 1000; // 30 seconds private volatile long > lastTimeReceivedElement = 0L; > > @Override public long extractTimestamp(Long element, long > previousElementTimestamp) { > maxTimestamp = Math.max(maxTimestamp, element); > lastTimeReceivedElement = System.currentTimeMillis(); > return element; > } > > @Nullable @Override public Watermark getCurrentWatermark() { > if (System.currentTimeMillis() - lastTimeReceivedElement >= MAX_IDLE) { > maxTimestamp = Math.max(maxTimestamp, System.currentTimeMillis() - > OFFSET); > } > return new Watermark(maxTimestamp); > } > } > > Regards, > Dian > >> 在 2018年12月13日,上午12:06,Andrey Zagrebin <and...@data-artisans.com >> <mailto:and...@data-artisans.com>> 写道: >> >> Hi Florin, >> >> I think Dawid might help you. I am pulling him into the discussion. >> >> Best, >> Andrey >> >>> On 12 Dec 2018, at 16:24, Spico Florin <spicoflo...@gmail.com >>> <mailto:spicoflo...@gmail.com>> wrote: >>> >>> Hello! >>> I'm using the same questions as in this stackoverflow >>> post >>> https://stackoverflow.com/questions/50356981/apache-flink-cep-how-to-detect-if-event-did-not-occur-within-x-seconds?rq=1, >>> due to fact that I need the same functionality. >>> >>> My use case is to detect when an device is disconnected from a >>> server (for example). >>> From SNMP I'm sending the data to Kafka then to Flink CEP and check >>> whether I have the established connection within X seconds. >>> If the window of X seconds timedout, then I can consider that the >>> device is disconnected. >>> The issue is that, when the device is disconnected, SNMP does't send >>> anymore data about that port that was used. Thus no event is sent to >>> CEP. Only when the device is connected again then, the disconnected >>> event is triggered. >>> >>> The same functionality was requested in this post >>> too: >>> https://stackoverflow.com/questions/51399973/flink-time-window-based-on-event-time-output-nothing. >>> >>> I checked the status of the jira ticket regarding this feature: >>> >>> https://issues.apache.org/jira/browse/FLINK-7384 >>> >>> but unfortunately, is still open. >>> >>> In the official >>> documentation, >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html, >>> in the paragraph related with Idling sources, somehow the situation >>> described resembles what we need: >>> "Currently, with pure event time watermarks generators, watermarks >>> can not progress if there are no elements to be processed. That >>> means in case of gap in the >>> incoming data, event time will not progress and for example the >>> window operator will not be triggered and thus existing windows will >>> not be able to produce any output data. >>> To circumvent this one can use periodic watermark assigners that >>> don’t only assign based on element timestamps. An example solution >>> could be >>> an assigner that switches to using current processing time as the >>> time basis after not observing new events for a while." >>> >>> Unfortunately, no example of how the mentioned assigner looks like. >>> A full example will help us to check whether the mentioned solution >>> works or not. >>> >>> Therefore, can you please share your thoughts about this feature? >>> Is feasible? If yes,please provide the Flink version that solve it >>> and the example. >>> Is still a limitation and there is no plan to solve it in near future? >>> >>> I look forward for your answers. >>> >>> Best regards, >>> Florin >>> >>> >> >
signature.asc
Description: OpenPGP digital signature