Absolutely.  For one a simple m out of n true conditions where n is defined
by range is a little under optimized as in just using time(m) will not
short circuit the partial patterns till the time range is achieved even if
there is no way m true conditions can be achieved ( we already have had n-m
false conditions ) . That makes sense as we have defined a within()
condition predicated on n.

I think the way one would do it is to iterative condition and look at all
events  ( including the ones with false but that can be expensive ) and
stop a pattern. One question I had is that an NFA can be in a FinalState or
a StopState.

What would constitute a StopState ?

On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Hi Vishal,
>
> Thanks a lot for sharing your experience and the potential caveats to
> consider when
> specifying your pattern.
>
> I agree that there is room for improvement when it comes to the state
> checkpointed in Flink.
> We already have some ideas but still, as you also said, the bulk of the
> space consumption
> comes from the pattern definition, so it could be nice if more people did
> the same, i.e. sharing
> their experience, and why not, compiling a guide of things to avoid and
> put it along the rest
> of FlinkCEP documentation.
>
> What do you think?
>
> Kostas
>
>
>
> On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
> Hello all,  There were recent changes to the flink master that I pulled in
> and that *seems* to have solved our issue.
>
> Few points
>
> * CEP is heavy as the NFA  transition  matrix   as state which can be
> possibly  n^2 ( worst case )  can easily blow up space requirements.  The
> after match skip strategy is likely to play a crucial role in keeping the
> state lean https://ci.apache.org/projects/flink/flink-docs-master/
> dev/libs/cep.html#after-match-skip-strategy.  In our case we do not
> require partial matches within a match to contribute to another potential
> match ( noise for us )  and thus *SKIP_PAST_LAST_EVENT *was used which on
> match will prune the SharedBuffer ( almost reset it )
>
> * The argument that the pattern events should be lean holds much more in
> CEP due to the potential exponential increase in space requirements.
>
> * The nature of the pattern will require consideration if state does blow
> up for you.
>
> Apart from that, I am still not sure why toString() on SharedBuffer was
> called to get an OOM to begin with.
>
>
>
> On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> We could not recreate in a controlled setup, but here are a few notes
>> that we have gathered on a simple  "times(n),within(..)"
>>
>> In case where the Event does not create a Final or Stop state
>>
>> * As an NFA processes an Event, NFA mutates if there is a true Event.
>> Each computation is a counter that keeps track of partial matches with each
>> true Event already existent partial match for that computation unit.
>> Essentially for n Events and if each Event is a true there will be roughly
>> n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or
>> first will have n-1 events in the partial match, 2 has n-1 events and so on
>> and n-1 has the last event as a partial match ).
>>
>> * If the WM progresses  beyond the ts of the 1st computation, that
>> partial match is pruned.
>>
>> * It makes sure that a SharedBufferEntry is pruned only if the count of
>> Edges originating from it reduces to 0 ( the internalRemove() which uses a
>> Stack) , which should happen as WM keeps progressing to the nth element for
>> unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a
>> WM progression
>>
>>
>> In case there is a FinalState  ( and we skipToFirstAfterLast )
>>
>> * The NFA by will prune ( release )  all partial matches and prune the
>> shared buffer and emit the current match. The computations now should be
>> empty.
>>
>> There is a lot to it, but is that roughly what is done in that code ?
>>
>>
>>
>> Few questions.
>>
>> * What we have seen is that the call to toString method of SharedBuffer
>> is where OOM occurs. Now in the code there is no call to a Log so we are
>> not sure why the method or who calls that method. Surely that is not part
>> of the Seriazation/DeSer routine or is it ( very surprising if it is )
>> * There is no out of the box implementation of "m out of n"  pattern
>> match. We have to resort to n in range ( m * time series slot ) which we
>> do. This is fine but what it does not allow is an optimization where if n
>> false conditions are seen, one can prune.  Simply speaking if n-m  false
>> have been seen there is no way  that out of n there will be ever m trues
>> and thus SharedBuffer can be pruned to the last true seen ( very akin to 
>> skipToFirstAfterLast
>> ).
>>
>> We will keep instrumenting the code ( which apart from the null message
>> is easily understandable ) but would love to hear your feedback.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Thanks a lot Vishal!
>>>
>>> We are looking forward to a test case that reproduces the failure.
>>>
>>> Kostas
>>>
>>>
>>> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com>
>>> wrote:
>>>
>>> This is the pattern. Will create a test case.
>>>
>>> /**
>>>  *
>>>  * @param condition a single condition is applied as a  acceptance criteria
>>>  * @param params defining the bounds of the pattern.
>>>  * @param <U> the element in the stream
>>>  * @return compiled pattern alonf with the params.
>>>  */
>>> public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> 
>>> of(SimpleCondition<U> condition,
>>>                                                                           
>>> RelaxedContiguityWithinTime params,
>>>                                                                           
>>> RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
>>>                                                                           
>>> String patternId) {
>>>     assert (params.seriesLength >= params.elementCount && 
>>> params.elementCount > 0);
>>>     Pattern<U, ?> pattern = Pattern.
>>>             <U>begin(START).
>>>             where(condition);
>>>     if (params.elementCount > 1) pattern = pattern.
>>>             followedBy(REST).
>>>             where(condition).
>>>             times(params.elementCount - 1);
>>>
>>>
>>>     return new RelaxedContiguousPattern<U>(
>>>             pattern.within(Time.minutes(params.seriesLength * 
>>> params.period.duration))
>>>             ,params,
>>>             params.elementCount > 1,
>>>             params.period.duration,
>>>             mapFunc,
>>>             patternId
>>>     );
>>> }
>>>
>>>
>>>
>>>
>>> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <
>>> wysakowicz.da...@gmail.com> wrote:
>>>
>>>> Could you provide some example to reproduce the case? Or the Pattern
>>>> that you are using? It would help track down the issue.
>>>>
>>>> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com>
>>>> wrote:
>>>> >
>>>> > I have pulled in the flink master cep library and the runtime ( the
>>>> cluster ) is configured to work against the latest and greatest. This does
>>>> not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is
>>>> always an issue when it is a larger range ( 20 out of 25 with range of 8
>>>> hours ) . Does that makes sense?
>>>> >
>>>> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <
>>>> wysakowicz.da...@gmail.com> wrote:
>>>> > This problem sounds very similar to this one that was fixed for 1.4.1
>>>> and 1.5.0:
>>>> > https://issues.apache.org/jira/browse/FLINK-8226
>>>> >
>>>> > Could you check if that helps with your problem too?
>>>> >
>>>> > > On 1 Feb 2018, at 23:34, Vishal Santoshi <vishal.santo...@gmail.com>
>>>> wrote:
>>>> > >
>>>> > > I have flink master CEP library code imported to  a 1.4 build.
>>>> > >
>>>> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>> > > A new one
>>>> > >
>>>> > > java.lang.OutOfMemoryError: Java heap space
>>>> > >       at java.util.Arrays.copyOf(
>>>> > > Arrays.java:3332)
>>>> > >       at java.lang.
>>>> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringB
>>>> uilder.java:
>>>> > > 124)
>>>> > >       at java.lang.
>>>> > > AbstractStringBuilder.append(AbstractStringBuilder.java:
>>>> > > 448)
>>>> > >       at java.lang.StringBuilder.
>>>> > > append(StringBuilder.java:136)
>>>> > >       at java.lang.StringBuilder.
>>>> > > append(StringBuilder.java:131)
>>>> > >       at org.apache.commons.lang3.
>>>> > > StringUtils.join(StringUtils.
>>>> > > java:4106)
>>>> > >       at org.apache.commons.lang3.
>>>> > > StringUtils.join(StringUtils.
>>>> > > java:4151)
>>>> > >       at org.apache.flink.cep.nfa.
>>>> > > SharedBuffer$SharedBufferEntry.toString(
>>>> > > SharedBuffer.java:624)
>>>> > >       at java.lang.String.valueOf(
>>>> > > String.java:2994)
>>>> > >       at java.lang.StringBuilder.
>>>> > > append(StringBuilder.java:131)
>>>> > >       at org.apache.flink.cep.nfa.
>>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
>>>> > > 673)
>>>> > >       at java.lang.String.valueOf(
>>>> > > String.java:2994)
>>>> > >       at java.lang.StringBuilder.
>>>> > > append(StringBuilder.java:131)
>>>> > >       at org.apache.commons.lang3.
>>>> > > StringUtils.join(StringUtils.
>>>> > > java:4097)
>>>> > >       at org.apache.commons.lang3.
>>>> > > StringUtils.join(StringUtils.
>>>> > > java:4151)
>>>> > >       at org.apache.flink.cep.nfa.
>>>> > > SharedBuffer$SharedBufferEntry.toString(
>>>> > > SharedBuffer.java:624)
>>>> > >       at java.lang.String.valueOf(
>>>> > > String.java:2994)
>>>> > >       at java.lang.StringBuilder.
>>>> > > append(StringBuilder.java:131)
>>>> > >       at org.apache.flink.cep.nfa.
>>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
>>>> > > .
>>>> > > .
>>>> > > .
>>>> > > It is the toString() on
>>>> > > SharedBuffer
>>>> > > no doubt. Some recursive loop ?
>>>> > >
>>>> > >
>>>> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>> > > It happens when it looks to throw an exception and calls
>>>> shardBuffer.toString. b'coz of the check....
>>>> > >
>>>> > >
>>>> > > int id = sharedBuffer.entryId;
>>>> > > Preconditions.checkState(id != -1, "Could not find id for entry: "
>>>> + sharedBuffer);
>>>> > >
>>>> > >
>>>> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>> > > The watermark has not moved for this pattern to succeed ( or other
>>>> wise ), the issue though is that it is pretty early in the pipe ( like
>>>> within a minute ).  I am replaying from a kafka topic but the keyed
>>>> operator has emitted no more than 1500 plus elements to SelectCEPOperator (
>>>> very visible on the UI ) so am sure not enough elements have been added to
>>>> the SharedBuffer to create memory stress.
>>>> > >
>>>> > > The nature of the input stream is that events are pushed out with a
>>>> specific timestamp ( it is a time series and the timestamp if the beginning
>>>> of the time slot )  as in one will have a bunch of elements that have a
>>>> constant timestamp till the next batch appears.
>>>> > >
>>>> > > A batch though does not have more than the number of keys elements
>>>> ( 600 ).
>>>> > >
>>>> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>> > > This is a pretty simple pattern, as in I hardly have 1500 elements
>>>> ( across 600 keys at the max ) put in
>>>> > > and though I have a pretty wide range , as in I am looking at a
>>>> relaxed pattern ( like 40 true conditions in 6 hours ),
>>>> > > I get this. I have the EventTime turned on.
>>>> > >
>>>> > >
>>>> > > java.lang.OutOfMemoryError: Java heap space
>>>> > >       at java.util.Arrays.copyOf(Arrays
>>>> > > .java:3332)
>>>> > >       at java.lang.AbstractStringBuilde
>>>> > > r.ensureCapacityInternal(Abstr
>>>> > > actStringBuilder.java:124)
>>>> > >       at java.lang.AbstractStringBuilde
>>>> > > r.append(AbstractStringBuilder
>>>> > > .java:448)
>>>> > >       at java.lang.StringBuilder.append
>>>> > > (StringBuilder.java:136)
>>>> > >       at java.lang.StringBuilder.append
>>>> > > (StringBuilder.java:131)
>>>> > >       at org.apache.commons.lang3.Strin
>>>> > > gUtils.join(StringUtils.java:4
>>>> > > 106)
>>>> > >       at org.apache.commons.lang3.Strin
>>>> > > gUtils.join(StringUtils.java:4
>>>> > > 151)
>>>> > >       at org.apache.flink.cep.nfa.Share
>>>> > > dBuffer$SharedBufferEntry.toSt
>>>> > > ring(SharedBuffer.java:624)
>>>> > >       at java.lang.String.valueOf(Strin
>>>> > > g.java:2994)
>>>> > >       at java.lang.StringBuilder.append
>>>> > > (StringBuilder.java:131)
>>>> > >       at org.apache.flink.cep.nfa.Share
>>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
>>>> > > 64)
>>>> > >       at org.apache.flink.cep.nfa.Share
>>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
>>>> > > 35)
>>>> > >       at org.apache.flink.cep.nfa.NFA$N
>>>> > > FASerializer.serialize(NFA.jav
>>>> > > a:888)
>>>> > >       at org.apache.flink.cep.nfa.NFA$N
>>>> > > FASerializer.serialize(NFA.jav
>>>> > > a:820)
>>>> > >       at org.apache.flink.contrib.strea
>>>> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
>>>> > > .
>>>> > > .
>>>> > > .
>>>> > >
>>>> > > Any one has seen this issue ?
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> >
>>>> >
>>>>
>>>>
>>>
>>>
>>
>
>

Reply via email to