Hi Vishal, A stopState is a state that invalidates a partial match, e.g. a.NotFollowedBy(b).followedBy(c). If you have an “a” and then you see a “b” then you invalidate the pattern.
A finalState is the one where a match has been found. Kostas > On Mar 7, 2018, at 3:20 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > > Absolutely. For one a simple m out of n true conditions where n is defined > by range is a little under optimized as in just using time(m) will not short > circuit the partial patterns till the time range is achieved even if there is > no way m true conditions can be achieved ( we already have had n-m false > conditions ) . That makes sense as we have defined a within() condition > predicated on n. > > I think the way one would do it is to iterative condition and look at all > events ( including the ones with false but that can be expensive ) and stop > a pattern. One question I had is that an NFA can be in a FinalState or a > StopState. > > What would constitute a StopState ? > > On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas <k.klou...@data-artisans.com > <mailto:k.klou...@data-artisans.com>> wrote: > Hi Vishal, > > Thanks a lot for sharing your experience and the potential caveats to > consider when > specifying your pattern. > > I agree that there is room for improvement when it comes to the state > checkpointed in Flink. > We already have some ideas but still, as you also said, the bulk of the space > consumption > comes from the pattern definition, so it could be nice if more people did the > same, i.e. sharing > their experience, and why not, compiling a guide of things to avoid and put > it along the rest > of FlinkCEP documentation. > > What do you think? > > Kostas > > > >> On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> >> Hello all, There were recent changes to the flink master that I pulled in >> and that seems to have solved our issue. >> >> Few points >> >> * CEP is heavy as the NFA transition matrix as state which can be >> possibly n^2 ( worst case ) can easily blow up space requirements. The >> after match skip strategy is likely to play a crucial role in keeping the >> state lean >> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy >> >> <https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy>. >> In our case we do not require partial matches within a match to contribute >> to another potential match ( noise for us ) and thus SKIP_PAST_LAST_EVENT >> was used which on match will prune the SharedBuffer ( almost reset it ) >> >> * The argument that the pattern events should be lean holds much more in CEP >> due to the potential exponential increase in space requirements. >> >> * The nature of the pattern will require consideration if state does blow up >> for you. >> >> Apart from that, I am still not sure why toString() on SharedBuffer was >> called to get an OOM to begin with. >> >> >> >> On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> We could not recreate in a controlled setup, but here are a few notes that >> we have gathered on a simple "times(n),within(..)" >> >> In case where the Event does not create a Final or Stop state >> >> * As an NFA processes an Event, NFA mutates if there is a true Event. Each >> computation is a counter that keeps track of partial matches with each true >> Event already existent partial match for that computation unit. Essentially >> for n Events and if each Event is a true there will be roughly n-1 >> computations, each with representing an Event from 1 to n-1 ( so 1 or first >> will have n-1 events in the partial match, 2 has n-1 events and so on and >> n-1 has the last event as a partial match ). >> >> * If the WM progresses beyond the ts of the 1st computation, that partial >> match is pruned. >> >> * It makes sure that a SharedBufferEntry is pruned only if the count of >> Edges originating from it reduces to 0 ( the internalRemove() which uses a >> Stack) , which should happen as WM keeps progressing to the nth element for >> unfulfilled patterns. A "null" ( not a fan ) event is used to establish a >> WM progression >> >> >> In case there is a FinalState ( and we skipToFirstAfterLast ) >> >> * The NFA by will prune ( release ) all partial matches and prune the >> shared buffer and emit the current match. The computations now should be >> empty. >> >> There is a lot to it, but is that roughly what is done in that code ? >> >> >> >> Few questions. >> >> * What we have seen is that the call to toString method of SharedBuffer is >> where OOM occurs. Now in the code there is no call to a Log so we are not >> sure why the method or who calls that method. Surely that is not part of the >> Seriazation/DeSer routine or is it ( very surprising if it is ) >> * There is no out of the box implementation of "m out of n" pattern match. >> We have to resort to n in range ( m * time series slot ) which we do. This >> is fine but what it does not allow is an optimization where if n false >> conditions are seen, one can prune. Simply speaking if n-m false have been >> seen there is no way that out of n there will be ever m trues and thus >> SharedBuffer can be pruned to the last true seen ( very akin to >> skipToFirstAfterLast ). >> >> We will keep instrumenting the code ( which apart from the null message is >> easily understandable ) but would love to hear your feedback. >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <k.klou...@data-artisans.com >> <mailto:k.klou...@data-artisans.com>> wrote: >> Thanks a lot Vishal! >> >> We are looking forward to a test case that reproduces the failure. >> >> Kostas >> >> >>> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com >>> <mailto:vishal.santo...@gmail.com>> wrote: >>> >>> This is the pattern. Will create a test case. >>> /** >>> * >>> * @param condition a single condition is applied as a acceptance criteria >>> * @param params defining the bounds of the pattern. >>> * @param <U> the element in the stream >>> * @return compiled pattern alonf with the params. >>> */ >>> public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> >>> of(SimpleCondition<U> condition, >>> >>> RelaxedContiguityWithinTime params, >>> >>> RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc, >>> >>> String patternId) { >>> assert (params.seriesLength >= params.elementCount && >>> params.elementCount > 0); >>> Pattern<U, ?> pattern = Pattern. >>> <U>begin(START). >>> where(condition); >>> if (params.elementCount > 1) pattern = pattern. >>> followedBy(REST). >>> where(condition). >>> times(params.elementCount - 1); >>> >>> return new RelaxedContiguousPattern<U>( >>> pattern.within(Time.minutes(params.seriesLength * >>> params.period.duration)) >>> ,params, >>> params.elementCount > 1, >>> params.period.duration, >>> mapFunc, >>> patternId >>> ); >>> } >>> >>> >>> >>> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz >>> <wysakowicz.da...@gmail.com <mailto:wysakowicz.da...@gmail.com>> wrote: >>> Could you provide some example to reproduce the case? Or the Pattern that >>> you are using? It would help track down the issue. >>> >>> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com >>> > <mailto:vishal.santo...@gmail.com>> wrote: >>> > >>> > I have pulled in the flink master cep library and the runtime ( the >>> > cluster ) is configured to work against the latest and greatest. This >>> > does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) >>> > but is always an issue when it is a larger range ( 20 out of 25 with >>> > range of 8 hours ) . Does that makes sense? >>> > >>> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz >>> > <wysakowicz.da...@gmail.com <mailto:wysakowicz.da...@gmail.com>> wrote: >>> > This problem sounds very similar to this one that was fixed for 1.4.1 and >>> > 1.5.0: >>> > https://issues.apache.org/jira/browse/FLINK-8226 >>> > <https://issues.apache.org/jira/browse/FLINK-8226> >>> > >>> > Could you check if that helps with your problem too? >>> > >>> > > On 1 Feb 2018, at 23:34, Vishal Santoshi <vishal.santo...@gmail.com >>> > > <mailto:vishal.santo...@gmail.com>> wrote: >>> > > >>> > > I have flink master CEP library code imported to a 1.4 build. >>> > > >>> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi >>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>> > > A new one >>> > > >>> > > java.lang.OutOfMemoryError: Java heap space >>> > > at java.util.Arrays.copyOf( >>> > > Arrays.java:3332) >>> > > at java.lang. >>> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java: >>> > > 124) >>> > > at java.lang. >>> > > AbstractStringBuilder.append(AbstractStringBuilder.java: >>> > > 448) >>> > > at java.lang.StringBuilder. >>> > > append(StringBuilder.java:136) >>> > > at java.lang.StringBuilder. >>> > > append(StringBuilder.java:131) >>> > > at org.apache.commons.lang3. >>> > > StringUtils.join(StringUtils. >>> > > java:4106) >>> > > at org.apache.commons.lang3. >>> > > StringUtils.join(StringUtils. >>> > > java:4151) >>> > > at org.apache.flink.cep.nfa. >>> > > SharedBuffer$SharedBufferEntry.toString( >>> > > SharedBuffer.java:624) >>> > > at java.lang.String.valueOf( >>> > > String.java:2994) >>> > > at java.lang.StringBuilder. >>> > > append(StringBuilder.java:131) >>> > > at org.apache.flink.cep.nfa. >>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java: >>> > > 673) >>> > > at java.lang.String.valueOf( >>> > > String.java:2994) >>> > > at java.lang.StringBuilder. >>> > > append(StringBuilder.java:131) >>> > > at org.apache.commons.lang3. >>> > > StringUtils.join(StringUtils. >>> > > java:4097) >>> > > at org.apache.commons.lang3. >>> > > StringUtils.join(StringUtils. >>> > > java:4151) >>> > > at org.apache.flink.cep.nfa. >>> > > SharedBuffer$SharedBufferEntry.toString( >>> > > SharedBuffer.java:624) >>> > > at java.lang.String.valueOf( >>> > > String.java:2994) >>> > > at java.lang.StringBuilder. >>> > > append(StringBuilder.java:131) >>> > > at org.apache.flink.cep.nfa. >>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673) >>> > > . >>> > > . >>> > > . >>> > > It is the toString() on >>> > > SharedBuffer >>> > > no doubt. Some recursive loop ? >>> > > >>> > > >>> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi >>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>> > > It happens when it looks to throw an exception and calls >>> > > shardBuffer.toString. b'coz of the check.... >>> > > >>> > > >>> > > int id = sharedBuffer.entryId; >>> > > Preconditions.checkState(id != -1, "Could not find id for entry: " + >>> > > sharedBuffer); >>> > > >>> > > >>> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi >>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>> > > The watermark has not moved for this pattern to succeed ( or other wise >>> > > ), the issue though is that it is pretty early in the pipe ( like >>> > > within a minute ). I am replaying from a kafka topic but the keyed >>> > > operator has emitted no more than 1500 plus elements to >>> > > SelectCEPOperator ( very visible on the UI ) so am sure not enough >>> > > elements have been added to the SharedBuffer to create memory stress. >>> > > >>> > > The nature of the input stream is that events are pushed out with a >>> > > specific timestamp ( it is a time series and the timestamp if the >>> > > beginning of the time slot ) as in one will have a bunch of elements >>> > > that have a constant timestamp till the next batch appears. >>> > > >>> > > A batch though does not have more than the number of keys elements ( >>> > > 600 ). >>> > > >>> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi >>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>> > > This is a pretty simple pattern, as in I hardly have 1500 elements ( >>> > > across 600 keys at the max ) put in >>> > > and though I have a pretty wide range , as in I am looking at a relaxed >>> > > pattern ( like 40 true conditions in 6 hours ), >>> > > I get this. I have the EventTime turned on. >>> > > >>> > > >>> > > java.lang.OutOfMemoryError: Java heap space >>> > > at java.util.Arrays.copyOf(Arrays >>> > > .java:3332) >>> > > at java.lang.AbstractStringBuilde >>> > > r.ensureCapacityInternal(Abstr >>> > > actStringBuilder.java:124) >>> > > at java.lang.AbstractStringBuilde >>> > > r.append(AbstractStringBuilder >>> > > .java:448) >>> > > at java.lang.StringBuilder.append >>> > > (StringBuilder.java:136) >>> > > at java.lang.StringBuilder.append >>> > > (StringBuilder.java:131) >>> > > at org.apache.commons.lang3.Strin >>> > > gUtils.join(StringUtils.java:4 >>> > > 106) >>> > > at org.apache.commons.lang3.Strin >>> > > gUtils.join(StringUtils.java:4 >>> > > 151) >>> > > at org.apache.flink.cep.nfa.Share >>> > > dBuffer$SharedBufferEntry.toSt >>> > > ring(SharedBuffer.java:624) >>> > > at java.lang.String.valueOf(Strin >>> > > g.java:2994) >>> > > at java.lang.StringBuilder.append >>> > > (StringBuilder.java:131) >>> > > at org.apache.flink.cep.nfa.Share >>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9 >>> > > 64) >>> > > at org.apache.flink.cep.nfa.Share >>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8 >>> > > 35) >>> > > at org.apache.flink.cep.nfa.NFA$N >>> > > FASerializer.serialize(NFA.jav >>> > > a:888) >>> > > at org.apache.flink.cep.nfa.NFA$N >>> > > FASerializer.serialize(NFA.jav >>> > > a:820) >>> > > at org.apache.flink.contrib.strea >>> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100) >>> > > . >>> > > . >>> > > . >>> > > >>> > > Any one has seen this issue ? >>> > > >>> > > >>> > > >>> > > >>> > > >>> > >>> > >>> >>> >> >> >> > >