Why not opening a JIRA and working on adding some debug statements that you consider useful?
This could help the next user that faces the same issues ;) Kostas > On Mar 7, 2018, at 3:29 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > > Aah, yes we never had a sink state so never came across a case where it was > ever exercised. When the range expires, it is a prune rather than a stop > state ( we were expecting it to be a stop state ) which is some what > misleading if we hold stop state to " that invalidates a partial match " > whatever the reason may be. > > Again I would also advise ( though not a biggy ) that strategic debug > statements in the CEP core would help folks to see what actually happens. We > instrumented the code to follow the construction of NFA that was very > helpful. > > On Wed, Mar 7, 2018 at 9:23 AM, Kostas Kloudas <k.klou...@data-artisans.com > <mailto:k.klou...@data-artisans.com>> wrote: > Hi Vishal, > > A stopState is a state that invalidates a partial match, e.g. > a.NotFollowedBy(b).followedBy(c). > If you have an “a” and then you see a “b” then you invalidate the pattern. > > A finalState is the one where a match has been found. > > Kostas > > >> On Mar 7, 2018, at 3:20 PM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> >> Absolutely. For one a simple m out of n true conditions where n is defined >> by range is a little under optimized as in just using time(m) will not short >> circuit the partial patterns till the time range is achieved even if there >> is no way m true conditions can be achieved ( we already have had n-m false >> conditions ) . That makes sense as we have defined a within() condition >> predicated on n. >> >> I think the way one would do it is to iterative condition and look at all >> events ( including the ones with false but that can be expensive ) and stop >> a pattern. One question I had is that an NFA can be in a FinalState or a >> StopState. >> >> What would constitute a StopState ? >> >> On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas <k.klou...@data-artisans.com >> <mailto:k.klou...@data-artisans.com>> wrote: >> Hi Vishal, >> >> Thanks a lot for sharing your experience and the potential caveats to >> consider when >> specifying your pattern. >> >> I agree that there is room for improvement when it comes to the state >> checkpointed in Flink. >> We already have some ideas but still, as you also said, the bulk of the >> space consumption >> comes from the pattern definition, so it could be nice if more people did >> the same, i.e. sharing >> their experience, and why not, compiling a guide of things to avoid and put >> it along the rest >> of FlinkCEP documentation. >> >> What do you think? >> >> Kostas >> >> >> >>> On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <vishal.santo...@gmail.com >>> <mailto:vishal.santo...@gmail.com>> wrote: >>> >>> Hello all, There were recent changes to the flink master that I pulled in >>> and that seems to have solved our issue. >>> >>> Few points >>> >>> * CEP is heavy as the NFA transition matrix as state which can be >>> possibly n^2 ( worst case ) can easily blow up space requirements. The >>> after match skip strategy is likely to play a crucial role in keeping the >>> state lean >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy >>> >>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy>. >>> In our case we do not require partial matches within a match to >>> contribute to another potential match ( noise for us ) and thus >>> SKIP_PAST_LAST_EVENT was used which on match will prune the SharedBuffer ( >>> almost reset it ) >>> >>> * The argument that the pattern events should be lean holds much more in >>> CEP due to the potential exponential increase in space requirements. >>> >>> * The nature of the pattern will require consideration if state does blow >>> up for you. >>> >>> Apart from that, I am still not sure why toString() on SharedBuffer was >>> called to get an OOM to begin with. >>> >>> >>> >>> On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <vishal.santo...@gmail.com >>> <mailto:vishal.santo...@gmail.com>> wrote: >>> We could not recreate in a controlled setup, but here are a few notes that >>> we have gathered on a simple "times(n),within(..)" >>> >>> In case where the Event does not create a Final or Stop state >>> >>> * As an NFA processes an Event, NFA mutates if there is a true Event. Each >>> computation is a counter that keeps track of partial matches with each true >>> Event already existent partial match for that computation unit. Essentially >>> for n Events and if each Event is a true there will be roughly n-1 >>> computations, each with representing an Event from 1 to n-1 ( so 1 or first >>> will have n-1 events in the partial match, 2 has n-1 events and so on and >>> n-1 has the last event as a partial match ). >>> >>> * If the WM progresses beyond the ts of the 1st computation, that partial >>> match is pruned. >>> >>> * It makes sure that a SharedBufferEntry is pruned only if the count of >>> Edges originating from it reduces to 0 ( the internalRemove() which uses a >>> Stack) , which should happen as WM keeps progressing to the nth element for >>> unfulfilled patterns. A "null" ( not a fan ) event is used to establish a >>> WM progression >>> >>> >>> In case there is a FinalState ( and we skipToFirstAfterLast ) >>> >>> * The NFA by will prune ( release ) all partial matches and prune the >>> shared buffer and emit the current match. The computations now should be >>> empty. >>> >>> There is a lot to it, but is that roughly what is done in that code ? >>> >>> >>> >>> Few questions. >>> >>> * What we have seen is that the call to toString method of SharedBuffer is >>> where OOM occurs. Now in the code there is no call to a Log so we are not >>> sure why the method or who calls that method. Surely that is not part of >>> the Seriazation/DeSer routine or is it ( very surprising if it is ) >>> * There is no out of the box implementation of "m out of n" pattern match. >>> We have to resort to n in range ( m * time series slot ) which we do. This >>> is fine but what it does not allow is an optimization where if n false >>> conditions are seen, one can prune. Simply speaking if n-m false have >>> been seen there is no way that out of n there will be ever m trues and >>> thus SharedBuffer can be pruned to the last true seen ( very akin to >>> skipToFirstAfterLast ). >>> >>> We will keep instrumenting the code ( which apart from the null message is >>> easily understandable ) but would love to hear your feedback. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas >>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote: >>> Thanks a lot Vishal! >>> >>> We are looking forward to a test case that reproduces the failure. >>> >>> Kostas >>> >>> >>>> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com >>>> <mailto:vishal.santo...@gmail.com>> wrote: >>>> >>>> This is the pattern. Will create a test case. >>>> /** >>>> * >>>> * @param condition a single condition is applied as a acceptance criteria >>>> * @param params defining the bounds of the pattern. >>>> * @param <U> the element in the stream >>>> * @return compiled pattern alonf with the params. >>>> */ >>>> public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> >>>> of(SimpleCondition<U> condition, >>>> >>>> RelaxedContiguityWithinTime params, >>>> >>>> RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc, >>>> >>>> String patternId) { >>>> assert (params.seriesLength >= params.elementCount && >>>> params.elementCount > 0); >>>> Pattern<U, ?> pattern = Pattern. >>>> <U>begin(START). >>>> where(condition); >>>> if (params.elementCount > 1) pattern = pattern. >>>> followedBy(REST). >>>> where(condition). >>>> times(params.elementCount - 1); >>>> >>>> return new RelaxedContiguousPattern<U>( >>>> pattern.within(Time.minutes(params.seriesLength * >>>> params.period.duration)) >>>> ,params, >>>> params.elementCount > 1, >>>> params.period.duration, >>>> mapFunc, >>>> patternId >>>> ); >>>> } >>>> >>>> >>>> >>>> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz >>>> <wysakowicz.da...@gmail.com <mailto:wysakowicz.da...@gmail.com>> wrote: >>>> Could you provide some example to reproduce the case? Or the Pattern that >>>> you are using? It would help track down the issue. >>>> >>>> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com >>>> > <mailto:vishal.santo...@gmail.com>> wrote: >>>> > >>>> > I have pulled in the flink master cep library and the runtime ( the >>>> > cluster ) is configured to work against the latest and greatest. This >>>> > does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) >>>> > but is always an issue when it is a larger range ( 20 out of 25 with >>>> > range of 8 hours ) . Does that makes sense? >>>> > >>>> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz >>>> > <wysakowicz.da...@gmail.com <mailto:wysakowicz.da...@gmail.com>> wrote: >>>> > This problem sounds very similar to this one that was fixed for 1.4.1 >>>> > and 1.5.0: >>>> > https://issues.apache.org/jira/browse/FLINK-8226 >>>> > <https://issues.apache.org/jira/browse/FLINK-8226> >>>> > >>>> > Could you check if that helps with your problem too? >>>> > >>>> > > On 1 Feb 2018, at 23:34, Vishal Santoshi <vishal.santo...@gmail.com >>>> > > <mailto:vishal.santo...@gmail.com>> wrote: >>>> > > >>>> > > I have flink master CEP library code imported to a 1.4 build. >>>> > > >>>> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi >>>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>>> > > A new one >>>> > > >>>> > > java.lang.OutOfMemoryError: Java heap space >>>> > > at java.util.Arrays.copyOf( >>>> > > Arrays.java:3332) >>>> > > at java.lang. >>>> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java: >>>> > > 124) >>>> > > at java.lang. >>>> > > AbstractStringBuilder.append(AbstractStringBuilder.java: >>>> > > 448) >>>> > > at java.lang.StringBuilder. >>>> > > append(StringBuilder.java:136) >>>> > > at java.lang.StringBuilder. >>>> > > append(StringBuilder.java:131) >>>> > > at org.apache.commons.lang3. >>>> > > StringUtils.join(StringUtils. >>>> > > java:4106) >>>> > > at org.apache.commons.lang3. >>>> > > StringUtils.join(StringUtils. >>>> > > java:4151) >>>> > > at org.apache.flink.cep.nfa. >>>> > > SharedBuffer$SharedBufferEntry.toString( >>>> > > SharedBuffer.java:624) >>>> > > at java.lang.String.valueOf( >>>> > > String.java:2994) >>>> > > at java.lang.StringBuilder. >>>> > > append(StringBuilder.java:131) >>>> > > at org.apache.flink.cep.nfa. >>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java: >>>> > > 673) >>>> > > at java.lang.String.valueOf( >>>> > > String.java:2994) >>>> > > at java.lang.StringBuilder. >>>> > > append(StringBuilder.java:131) >>>> > > at org.apache.commons.lang3. >>>> > > StringUtils.join(StringUtils. >>>> > > java:4097) >>>> > > at org.apache.commons.lang3. >>>> > > StringUtils.join(StringUtils. >>>> > > java:4151) >>>> > > at org.apache.flink.cep.nfa. >>>> > > SharedBuffer$SharedBufferEntry.toString( >>>> > > SharedBuffer.java:624) >>>> > > at java.lang.String.valueOf( >>>> > > String.java:2994) >>>> > > at java.lang.StringBuilder. >>>> > > append(StringBuilder.java:131) >>>> > > at org.apache.flink.cep.nfa. >>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673) >>>> > > . >>>> > > . >>>> > > . >>>> > > It is the toString() on >>>> > > SharedBuffer >>>> > > no doubt. Some recursive loop ? >>>> > > >>>> > > >>>> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi >>>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>>> > > It happens when it looks to throw an exception and calls >>>> > > shardBuffer.toString. b'coz of the check.... >>>> > > >>>> > > >>>> > > int id = sharedBuffer.entryId; >>>> > > Preconditions.checkState(id != -1, "Could not find id for entry: " + >>>> > > sharedBuffer); >>>> > > >>>> > > >>>> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi >>>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>>> > > The watermark has not moved for this pattern to succeed ( or other >>>> > > wise ), the issue though is that it is pretty early in the pipe ( like >>>> > > within a minute ). I am replaying from a kafka topic but the keyed >>>> > > operator has emitted no more than 1500 plus elements to >>>> > > SelectCEPOperator ( very visible on the UI ) so am sure not enough >>>> > > elements have been added to the SharedBuffer to create memory stress. >>>> > > >>>> > > The nature of the input stream is that events are pushed out with a >>>> > > specific timestamp ( it is a time series and the timestamp if the >>>> > > beginning of the time slot ) as in one will have a bunch of elements >>>> > > that have a constant timestamp till the next batch appears. >>>> > > >>>> > > A batch though does not have more than the number of keys elements ( >>>> > > 600 ). >>>> > > >>>> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi >>>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>>> > > This is a pretty simple pattern, as in I hardly have 1500 elements ( >>>> > > across 600 keys at the max ) put in >>>> > > and though I have a pretty wide range , as in I am looking at a >>>> > > relaxed pattern ( like 40 true conditions in 6 hours ), >>>> > > I get this. I have the EventTime turned on. >>>> > > >>>> > > >>>> > > java.lang.OutOfMemoryError: Java heap space >>>> > > at java.util.Arrays.copyOf(Arrays >>>> > > .java:3332) >>>> > > at java.lang.AbstractStringBuilde >>>> > > r.ensureCapacityInternal(Abstr >>>> > > actStringBuilder.java:124) >>>> > > at java.lang.AbstractStringBuilde >>>> > > r.append(AbstractStringBuilder >>>> > > .java:448) >>>> > > at java.lang.StringBuilder.append >>>> > > (StringBuilder.java:136) >>>> > > at java.lang.StringBuilder.append >>>> > > (StringBuilder.java:131) >>>> > > at org.apache.commons.lang3.Strin >>>> > > gUtils.join(StringUtils.java:4 >>>> > > 106) >>>> > > at org.apache.commons.lang3.Strin >>>> > > gUtils.join(StringUtils.java:4 >>>> > > 151) >>>> > > at org.apache.flink.cep.nfa.Share >>>> > > dBuffer$SharedBufferEntry.toSt >>>> > > ring(SharedBuffer.java:624) >>>> > > at java.lang.String.valueOf(Strin >>>> > > g.java:2994) >>>> > > at java.lang.StringBuilder.append >>>> > > (StringBuilder.java:131) >>>> > > at org.apache.flink.cep.nfa.Share >>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9 >>>> > > 64) >>>> > > at org.apache.flink.cep.nfa.Share >>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8 >>>> > > 35) >>>> > > at org.apache.flink.cep.nfa.NFA$N >>>> > > FASerializer.serialize(NFA.jav >>>> > > a:888) >>>> > > at org.apache.flink.cep.nfa.NFA$N >>>> > > FASerializer.serialize(NFA.jav >>>> > > a:820) >>>> > > at org.apache.flink.contrib.strea >>>> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100) >>>> > > . >>>> > > . >>>> > > . >>>> > > >>>> > > Any one has seen this issue ? >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > >>>> > >>>> >>>> >>> >>> >>> >> >> > >