Will do.

On Wed, Mar 7, 2018 at 9:33 AM, Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Why not opening a JIRA and working on adding some debug
> statements that you consider useful?
>
> This could help the next user that faces the same issues ;)
>
> Kostas
>
> On Mar 7, 2018, at 3:29 PM, Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
> Aah, yes we never had a sink state so never came across a case where it
> was ever exercised.  When the range expires, it is a prune rather than a
> stop state ( we were expecting it to be a stop state )  which is some what
> misleading if we hold stop state to " that invalidates a partial match "
> whatever the reason may be.
>
> Again I would also advise ( though not a biggy )  that strategic debug
> statements in the CEP core would help folks to see what actually happens.
> We instrumented the code to follow the construction of NFA that was very
> helpful.
>
> On Wed, Mar 7, 2018 at 9:23 AM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Vishal,
>>
>> A stopState is a state that invalidates a partial match, e.g.
>> a.NotFollowedBy(b).followedBy(c).
>> If you have an “a” and then you see a “b” then you invalidate the pattern.
>>
>> A finalState is the one where a match has been found.
>>
>> Kostas
>>
>>
>> On Mar 7, 2018, at 3:20 PM, Vishal Santoshi <vishal.santo...@gmail.com>
>> wrote:
>>
>> Absolutely.  For one a simple m out of n true conditions where n is
>> defined by range is a little under optimized as in just using time(m) will
>> not short circuit the partial patterns till the time range is achieved even
>> if there is no way m true conditions can be achieved ( we already have had
>> n-m false conditions ) . That makes sense as we have defined a within()
>> condition predicated on n.
>>
>> I think the way one would do it is to iterative condition and look at
>> all  events  ( including the ones with false but that can be expensive )
>> and stop a pattern. One question I had is that an NFA can be in a
>> FinalState or a StopState.
>>
>> What would constitute a StopState ?
>>
>> On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Vishal,
>>>
>>> Thanks a lot for sharing your experience and the potential caveats to
>>> consider when
>>> specifying your pattern.
>>>
>>> I agree that there is room for improvement when it comes to the state
>>> checkpointed in Flink.
>>> We already have some ideas but still, as you also said, the bulk of the
>>> space consumption
>>> comes from the pattern definition, so it could be nice if more people
>>> did the same, i.e. sharing
>>> their experience, and why not, compiling a guide of things to avoid and
>>> put it along the rest
>>> of FlinkCEP documentation.
>>>
>>> What do you think?
>>>
>>> Kostas
>>>
>>>
>>>
>>> On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <vishal.santo...@gmail.com>
>>> wrote:
>>>
>>> Hello all,  There were recent changes to the flink master that I pulled
>>> in and that *seems* to have solved our issue.
>>>
>>> Few points
>>>
>>> * CEP is heavy as the NFA  transition  matrix   as state which can be
>>> possibly  n^2 ( worst case )  can easily blow up space requirements.  The
>>> after match skip strategy is likely to play a crucial role in keeping the
>>> state lean https://ci.apache.org/projects/flink/flink-docs-master/
>>> dev/libs/cep.html#after-match-skip-strategy.  In our case we do not
>>> require partial matches within a match to contribute to another potential
>>> match ( noise for us )  and thus *SKIP_PAST_LAST_EVENT *was used which
>>> on match will prune the SharedBuffer ( almost reset it )
>>>
>>> * The argument that the pattern events should be lean holds much more in
>>> CEP due to the potential exponential increase in space requirements.
>>>
>>> * The nature of the pattern will require consideration if state does
>>> blow up for you.
>>>
>>> Apart from that, I am still not sure why toString() on SharedBuffer was
>>> called to get an OOM to begin with.
>>>
>>>
>>>
>>> On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> We could not recreate in a controlled setup, but here are a few notes
>>>> that we have gathered on a simple  "times(n),within(..)"
>>>>
>>>> In case where the Event does not create a Final or Stop state
>>>>
>>>> * As an NFA processes an Event, NFA mutates if there is a true Event.
>>>> Each computation is a counter that keeps track of partial matches with each
>>>> true Event already existent partial match for that computation unit.
>>>> Essentially for n Events and if each Event is a true there will be roughly
>>>> n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or
>>>> first will have n-1 events in the partial match, 2 has n-1 events and so on
>>>> and n-1 has the last event as a partial match ).
>>>>
>>>> * If the WM progresses  beyond the ts of the 1st computation, that
>>>> partial match is pruned.
>>>>
>>>> * It makes sure that a SharedBufferEntry is pruned only if the count of
>>>> Edges originating from it reduces to 0 ( the internalRemove() which uses a
>>>> Stack) , which should happen as WM keeps progressing to the nth element for
>>>> unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a
>>>> WM progression
>>>>
>>>>
>>>> In case there is a FinalState  ( and we skipToFirstAfterLast )
>>>>
>>>> * The NFA by will prune ( release )  all partial matches and prune the
>>>> shared buffer and emit the current match. The computations now should
>>>> be empty.
>>>>
>>>> There is a lot to it, but is that roughly what is done in that code ?
>>>>
>>>>
>>>>
>>>> Few questions.
>>>>
>>>> * What we have seen is that the call to toString method of SharedBuffer
>>>> is where OOM occurs. Now in the code there is no call to a Log so we are
>>>> not sure why the method or who calls that method. Surely that is not part
>>>> of the Seriazation/DeSer routine or is it ( very surprising if it is )
>>>> * There is no out of the box implementation of "m out of n"  pattern
>>>> match. We have to resort to n in range ( m * time series slot ) which we
>>>> do. This is fine but what it does not allow is an optimization where if n
>>>> false conditions are seen, one can prune.  Simply speaking if n-m  false
>>>> have been seen there is no way  that out of n there will be ever m trues
>>>> and thus SharedBuffer can be pruned to the last true seen ( very akin to 
>>>> skipToFirstAfterLast
>>>> ).
>>>>
>>>> We will keep instrumenting the code ( which apart from the null message
>>>> is easily understandable ) but would love to hear your feedback.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <
>>>> k.klou...@data-artisans.com> wrote:
>>>>
>>>>> Thanks a lot Vishal!
>>>>>
>>>>> We are looking forward to a test case that reproduces the failure.
>>>>>
>>>>> Kostas
>>>>>
>>>>>
>>>>> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> This is the pattern. Will create a test case.
>>>>>
>>>>> /**
>>>>>  *
>>>>>  * @param condition a single condition is applied as a  acceptance 
>>>>> criteria
>>>>>  * @param params defining the bounds of the pattern.
>>>>>  * @param <U> the element in the stream
>>>>>  * @return compiled pattern alonf with the params.
>>>>>  */
>>>>> public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> 
>>>>> of(SimpleCondition<U> condition,
>>>>>                                                                           
>>>>> RelaxedContiguityWithinTime params,
>>>>>                                                                           
>>>>> RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
>>>>>                                                                           
>>>>> String patternId) {
>>>>>     assert (params.seriesLength >= params.elementCount && 
>>>>> params.elementCount > 0);
>>>>>     Pattern<U, ?> pattern = Pattern.
>>>>>             <U>begin(START).
>>>>>             where(condition);
>>>>>     if (params.elementCount > 1) pattern = pattern.
>>>>>             followedBy(REST).
>>>>>             where(condition).
>>>>>             times(params.elementCount - 1);
>>>>>
>>>>>
>>>>>     return new RelaxedContiguousPattern<U>(
>>>>>             pattern.within(Time.minutes(params.seriesLength * 
>>>>> params.period.duration))
>>>>>             ,params,
>>>>>             params.elementCount > 1,
>>>>>             params.period.duration,
>>>>>             mapFunc,
>>>>>             patternId
>>>>>     );
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <
>>>>> wysakowicz.da...@gmail.com> wrote:
>>>>>
>>>>>> Could you provide some example to reproduce the case? Or the Pattern
>>>>>> that you are using? It would help track down the issue.
>>>>>>
>>>>>> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > I have pulled in the flink master cep library and the runtime ( the
>>>>>> cluster ) is configured to work against the latest and greatest. This 
>>>>>> does
>>>>>> not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is
>>>>>> always an issue when it is a larger range ( 20 out of 25 with range of 8
>>>>>> hours ) . Does that makes sense?
>>>>>> >
>>>>>> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <
>>>>>> wysakowicz.da...@gmail.com> wrote:
>>>>>> > This problem sounds very similar to this one that was fixed for
>>>>>> 1.4.1 and 1.5.0:
>>>>>> > https://issues.apache.org/jira/browse/FLINK-8226
>>>>>> >
>>>>>> > Could you check if that helps with your problem too?
>>>>>> >
>>>>>> > > On 1 Feb 2018, at 23:34, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>> > >
>>>>>> > > I have flink master CEP library code imported to  a 1.4 build.
>>>>>> > >
>>>>>> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>> > > A new one
>>>>>> > >
>>>>>> > > java.lang.OutOfMemoryError: Java heap space
>>>>>> > >       at java.util.Arrays.copyOf(
>>>>>> > > Arrays.java:3332)
>>>>>> > >       at java.lang.
>>>>>> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringB
>>>>>> uilder.java:
>>>>>> > > 124)
>>>>>> > >       at java.lang.
>>>>>> > > AbstractStringBuilder.append(AbstractStringBuilder.java:
>>>>>> > > 448)
>>>>>> > >       at java.lang.StringBuilder.
>>>>>> > > append(StringBuilder.java:136)
>>>>>> > >       at java.lang.StringBuilder.
>>>>>> > > append(StringBuilder.java:131)
>>>>>> > >       at org.apache.commons.lang3.
>>>>>> > > StringUtils.join(StringUtils.
>>>>>> > > java:4106)
>>>>>> > >       at org.apache.commons.lang3.
>>>>>> > > StringUtils.join(StringUtils.
>>>>>> > > java:4151)
>>>>>> > >       at org.apache.flink.cep.nfa.
>>>>>> > > SharedBuffer$SharedBufferEntry.toString(
>>>>>> > > SharedBuffer.java:624)
>>>>>> > >       at java.lang.String.valueOf(
>>>>>> > > String.java:2994)
>>>>>> > >       at java.lang.StringBuilder.
>>>>>> > > append(StringBuilder.java:131)
>>>>>> > >       at org.apache.flink.cep.nfa.
>>>>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
>>>>>> > > 673)
>>>>>> > >       at java.lang.String.valueOf(
>>>>>> > > String.java:2994)
>>>>>> > >       at java.lang.StringBuilder.
>>>>>> > > append(StringBuilder.java:131)
>>>>>> > >       at org.apache.commons.lang3.
>>>>>> > > StringUtils.join(StringUtils.
>>>>>> > > java:4097)
>>>>>> > >       at org.apache.commons.lang3.
>>>>>> > > StringUtils.join(StringUtils.
>>>>>> > > java:4151)
>>>>>> > >       at org.apache.flink.cep.nfa.
>>>>>> > > SharedBuffer$SharedBufferEntry.toString(
>>>>>> > > SharedBuffer.java:624)
>>>>>> > >       at java.lang.String.valueOf(
>>>>>> > > String.java:2994)
>>>>>> > >       at java.lang.StringBuilder.
>>>>>> > > append(StringBuilder.java:131)
>>>>>> > >       at org.apache.flink.cep.nfa.
>>>>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
>>>>>> > > .
>>>>>> > > .
>>>>>> > > .
>>>>>> > > It is the toString() on
>>>>>> > > SharedBuffer
>>>>>> > > no doubt. Some recursive loop ?
>>>>>> > >
>>>>>> > >
>>>>>> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>> > > It happens when it looks to throw an exception and calls
>>>>>> shardBuffer.toString. b'coz of the check....
>>>>>> > >
>>>>>> > >
>>>>>> > > int id = sharedBuffer.entryId;
>>>>>> > > Preconditions.checkState(id != -1, "Could not find id for entry:
>>>>>> " + sharedBuffer);
>>>>>> > >
>>>>>> > >
>>>>>> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>> > > The watermark has not moved for this pattern to succeed ( or
>>>>>> other wise ), the issue though is that it is pretty early in the pipe (
>>>>>> like within a minute ).  I am replaying from a kafka topic but the keyed
>>>>>> operator has emitted no more than 1500 plus elements to 
>>>>>> SelectCEPOperator (
>>>>>> very visible on the UI ) so am sure not enough elements have been added 
>>>>>> to
>>>>>> the SharedBuffer to create memory stress.
>>>>>> > >
>>>>>> > > The nature of the input stream is that events are pushed out with
>>>>>> a specific timestamp ( it is a time series and the timestamp if the
>>>>>> beginning of the time slot )  as in one will have a bunch of elements 
>>>>>> that
>>>>>> have a constant timestamp till the next batch appears.
>>>>>> > >
>>>>>> > > A batch though does not have more than the number of keys
>>>>>> elements ( 600 ).
>>>>>> > >
>>>>>> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>> > > This is a pretty simple pattern, as in I hardly have 1500
>>>>>> elements ( across 600 keys at the max ) put in
>>>>>> > > and though I have a pretty wide range , as in I am looking at a
>>>>>> relaxed pattern ( like 40 true conditions in 6 hours ),
>>>>>> > > I get this. I have the EventTime turned on.
>>>>>> > >
>>>>>> > >
>>>>>> > > java.lang.OutOfMemoryError: Java heap space
>>>>>> > >       at java.util.Arrays.copyOf(Arrays
>>>>>> > > .java:3332)
>>>>>> > >       at java.lang.AbstractStringBuilde
>>>>>> > > r.ensureCapacityInternal(Abstr
>>>>>> > > actStringBuilder.java:124)
>>>>>> > >       at java.lang.AbstractStringBuilde
>>>>>> > > r.append(AbstractStringBuilder
>>>>>> > > .java:448)
>>>>>> > >       at java.lang.StringBuilder.append
>>>>>> > > (StringBuilder.java:136)
>>>>>> > >       at java.lang.StringBuilder.append
>>>>>> > > (StringBuilder.java:131)
>>>>>> > >       at org.apache.commons.lang3.Strin
>>>>>> > > gUtils.join(StringUtils.java:4
>>>>>> > > 106)
>>>>>> > >       at org.apache.commons.lang3.Strin
>>>>>> > > gUtils.join(StringUtils.java:4
>>>>>> > > 151)
>>>>>> > >       at org.apache.flink.cep.nfa.Share
>>>>>> > > dBuffer$SharedBufferEntry.toSt
>>>>>> > > ring(SharedBuffer.java:624)
>>>>>> > >       at java.lang.String.valueOf(Strin
>>>>>> > > g.java:2994)
>>>>>> > >       at java.lang.StringBuilder.append
>>>>>> > > (StringBuilder.java:131)
>>>>>> > >       at org.apache.flink.cep.nfa.Share
>>>>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
>>>>>> > > 64)
>>>>>> > >       at org.apache.flink.cep.nfa.Share
>>>>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
>>>>>> > > 35)
>>>>>> > >       at org.apache.flink.cep.nfa.NFA$N
>>>>>> > > FASerializer.serialize(NFA.jav
>>>>>> > > a:888)
>>>>>> > >       at org.apache.flink.cep.nfa.NFA$N
>>>>>> > > FASerializer.serialize(NFA.jav
>>>>>> > > a:820)
>>>>>> > >       at org.apache.flink.contrib.strea
>>>>>> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
>>>>>> > > .
>>>>>> > > .
>>>>>> > > .
>>>>>> > >
>>>>>> > > Any one has seen this issue ?
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> >
>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>>
>
>

Reply via email to