Hello, Dian! Thank you very much for your explanations. In my case, CEP patterns are based on the event time. Also, as you said I have many devices and also many ports on that devices. Therefore, I'm using keyed streams. So I would like to know which device was disconnected on which port. Is that feasible Flink? If yes in what version and how?
Regards, Florin On Thu, Dec 13, 2018 at 4:37 AM fudian.fd <fudian...@alibaba-inc.com> wrote: > Hi Florin, > > Are you using processing time or event time? The JIRA FLINK-7384 allows to > emit timed-out patterns without having to wait for the next element ONLY in > processing time. For event time, it still needs the watermark to trigger > the emitting of matched or timed-out patterns. Besides, the watermark is > global for the whole job, not per key. In your application, SNMP will not > send anymore data for one device when it's disconnected. But if there are > many devices to be monitored, the watermark will still progress as long as > there are some devices connected. If it's the case that there are no > elements at all, the mentioned assigner will look like the following: > > class PeriodicExtractor implements AssignerWithPeriodicWatermarks<Long> { > > private volatile long maxTimestamp = Long.MIN_VALUE; > > private static final long MAX_IDLE = 5 * 60 * 1000; // 5 minutes > private static final long OFFSET = 30 * 1000; // 30 seconds > private volatile long lastTimeReceivedElement = 0L; > > @Override > public long extractTimestamp(Long element, long previousElementTimestamp) { > maxTimestamp = Math.max(maxTimestamp, element); > lastTimeReceivedElement = System.currentTimeMillis(); > return element; > } > > @Nullable > @Override > public Watermark getCurrentWatermark() { > if (System.currentTimeMillis() - lastTimeReceivedElement >= MAX_IDLE) { > maxTimestamp = Math.max(maxTimestamp, System.currentTimeMillis() - > OFFSET); > } > return new Watermark(maxTimestamp); > } > } > > > Regards, > Dian > > 在 2018年12月13日,上午12:06,Andrey Zagrebin <and...@data-artisans.com> 写道: > > Hi Florin, > > I think Dawid might help you. I am pulling him into the discussion. > > Best, > Andrey > > On 12 Dec 2018, at 16:24, Spico Florin <spicoflo...@gmail.com> wrote: > > Hello! > I'm using the same questions as in this stackoverflow post > https://stackoverflow.com/questions/50356981/apache-flink-cep-how-to-detect-if-event-did-not-occur-within-x-seconds?rq=1, > due to fact that I need the same functionality. > > My use case is to detect when an device is disconnected from a server (for > example). > From SNMP I'm sending the data to Kafka then to Flink CEP and check > whether I have the established connection within X seconds. > If the window of X seconds timedout, then I can consider that the device > is disconnected. > The issue is that, when the device is disconnected, SNMP does't send > anymore data about that port that was used. Thus no event is sent to CEP. > Only when the device is connected again then, the disconnected event is > triggered. > > The same functionality was requested in this post too: > https://stackoverflow.com/questions/51399973/flink-time-window-based-on-event-time-output-nothing > . > > I checked the status of the jira ticket regarding this feature: > > https://issues.apache.org/jira/browse/FLINK-7384 > > but unfortunately, is still open. > > In the official documentation, > https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html, > in the paragraph related with Idling sources, somehow the situation > described resembles what we need: > "Currently, with pure event time watermarks generators, watermarks can not > progress if there are no elements to be processed. That means in case of > gap in the > incoming data, event time will not progress and for example the window > operator will not be triggered and thus existing windows will not be able > to produce any output data. > To circumvent this one can use periodic watermark assigners that don’t > only assign based on element timestamps. An example solution could be > an assigner that switches to using current processing time as the time > basis after not observing new events for a while." > > Unfortunately, no example of how the mentioned assigner looks like. A full > example will help us to check whether the mentioned solution works or not. > > Therefore, can you please share your thoughts about this feature? > Is feasible? If yes,please provide the Flink version that solve it and the > example. > Is still a limitation and there is no plan to solve it in near future? > > I look forward for your answers. > > Best regards, > Florin > > > > >