Hi Vitor,

the CEP operators are not working on real windows. What they do is to use a
NFA to track the state of multiple ongoing sequences. In order to store the
element efficiently, a kind of shared buffer with versioning is used. Once
a sequence has reached a final state, the sequence of elements is
backtracked in the shared buffer to produce the final result.

So in order to get access to the previous elements of a non-finished
sequence, we could simply apply the same mechanism just without removing
the sequence from the shared buffer. This would of course be a bit more
costly since for every state you retrieve the sequence of elements which
led to this state.

But we could offer two filter conditions. One which is more light-weight
and only offers access to the current element. And another filter condition
where you have access to the previous elements. The second variant might
make sense if you can prune early many false sequences.

Cheers,
Till

On Thu, Mar 3, 2016 at 2:03 PM, Vitor Vieira <vitorsv.vie...@gmail.com>
wrote:

> Hi Till,
>
> Idk if the windowing package should provide functions to operate on the
> internal elements.
>
> What is the easiest way, or is it possible to get, for example, the last
> event of a window, lets say a 5 second window?
>
> Rgds,
>
> Vitor Vieira
> @notvitor
>
> 2016-03-03 7:29 GMT-03:00 Till Rohrmann <till.rohrm...@gmail.com>:
>
>> Hi Jerry,
>>
>> at the moment it is not yet possible to access previous elements in the
>> filter function of an individual element. Therefore, you have to check for
>> the condition “B is 5 days after A” in the final select statement.
>> Giving this context to the where clause would be indeed a nice addition
>> to the CEP library. If you want, then you could file a JIRA ticket for it.
>>
>> Here is a simple example how you could solve your problem with the
>> current means:
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> // Tuple3(Key, Timestamp, Payload)
>> DataStream<Tuple3<Integer, Long, String>> input = 
>> env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1, 2000L, 
>> "second event"), Tuple3.of(1, 20000L, "third event"));
>>
>> Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, 
>> Long, String>>begin("A").followedBy("B").next("C");
>>
>> DataStream<String> result = CEP.pattern(input.keyBy(0), 
>> pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer, Long, 
>> String>, String>() {
>>     @Override
>>     public void flatSelect(Map<String, Tuple3<Integer, Long, String>> map, 
>> Collector<String> collector) throws Exception {
>>         Tuple3<Integer, Long, String> a = map.get("A");
>>         Tuple3<Integer, Long, String> b = map.get("B");
>>
>>         // check that a and b have at least 1000 ms in between
>>         if (b.f1 - a.f1 >= 1000) {
>>             collector.collect(a.f2);
>>         }
>>     }
>> });
>>
>> result.print();
>>
>> env.execute("CEP example");
>>
>> Cheers,
>> Till
>> ​
>>
>> On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <vitorsv.vie...@gmail.com>
>> wrote:
>>
>>> Hi Jerry,
>>>
>>> I'm currently evaluating the CEP library too, probably doing something
>>> similar.
>>>
>>> Something like... comparing the 'offset' of the last event in different
>>> time windows, each window, based on the event type, occurring like
>>> realtime, with this same day/hour/minute a week ago/15d/1month/etc...
>>>
>>> I plan to share some CEP examples once I finish this engine.
>>>
>>> -@notvitor
>>>
>>>
>>> 2016-03-02 19:28 GMT-03:00 Fabian Hueske <fhue...@gmail.com>:
>>>
>>>> Hi Jerry,
>>>>
>>>> I haven't used the CEP features yet, so I cannot comment on your
>>>> requirements.
>>>> In case you are looking for the CEP documentation, here it is:
>>>>
>>>> -->
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
>>>>
>>>> The CEP features will be included in the upcoming 1.0.0 release (which
>>>> we currently vote on).
>>>> I think you would be one of the first persons to use it. Please let us
>>>> know, if you find any problems.
>>>>
>>>> Thanks, Fabian
>>>>
>>>>
>>>> 2016-03-02 23:12 GMT+01:00 Jerry Lam <chiling...@gmail.com>:
>>>>
>>>>> Hi Flink users and developers,
>>>>>
>>>>> I'm trying to learn the CEP library. How can I express
>>>>> A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying to
>>>>> get a hold of is the events that matches A when I'm processing B.
>>>>>
>>>>> Is this supported?
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> Jerry
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to