Absolutely. For one a simple m out of n true conditions where n is defined by range is a little under optimized as in just using time(m) will not short circuit the partial patterns till the time range is achieved even if there is no way m true conditions can be achieved ( we already have had n-m false conditions ) . That makes sense as we have defined a within() condition predicated on n.
I think the way one would do it is to iterative condition and look at all events ( including the ones with false but that can be expensive ) and stop a pattern. One question I had is that an NFA can be in a FinalState or a StopState. What would constitute a StopState ? On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hi Vishal, > > Thanks a lot for sharing your experience and the potential caveats to > consider when > specifying your pattern. > > I agree that there is room for improvement when it comes to the state > checkpointed in Flink. > We already have some ideas but still, as you also said, the bulk of the > space consumption > comes from the pattern definition, so it could be nice if more people did > the same, i.e. sharing > their experience, and why not, compiling a guide of things to avoid and > put it along the rest > of FlinkCEP documentation. > > What do you think? > > Kostas > > > > On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > > Hello all, There were recent changes to the flink master that I pulled in > and that *seems* to have solved our issue. > > Few points > > * CEP is heavy as the NFA transition matrix as state which can be > possibly n^2 ( worst case ) can easily blow up space requirements. The > after match skip strategy is likely to play a crucial role in keeping the > state lean https://ci.apache.org/projects/flink/flink-docs-master/ > dev/libs/cep.html#after-match-skip-strategy. In our case we do not > require partial matches within a match to contribute to another potential > match ( noise for us ) and thus *SKIP_PAST_LAST_EVENT *was used which on > match will prune the SharedBuffer ( almost reset it ) > > * The argument that the pattern events should be lean holds much more in > CEP due to the potential exponential increase in space requirements. > > * The nature of the pattern will require consideration if state does blow > up for you. > > Apart from that, I am still not sure why toString() on SharedBuffer was > called to get an OOM to begin with. > > > > On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> We could not recreate in a controlled setup, but here are a few notes >> that we have gathered on a simple "times(n),within(..)" >> >> In case where the Event does not create a Final or Stop state >> >> * As an NFA processes an Event, NFA mutates if there is a true Event. >> Each computation is a counter that keeps track of partial matches with each >> true Event already existent partial match for that computation unit. >> Essentially for n Events and if each Event is a true there will be roughly >> n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or >> first will have n-1 events in the partial match, 2 has n-1 events and so on >> and n-1 has the last event as a partial match ). >> >> * If the WM progresses beyond the ts of the 1st computation, that >> partial match is pruned. >> >> * It makes sure that a SharedBufferEntry is pruned only if the count of >> Edges originating from it reduces to 0 ( the internalRemove() which uses a >> Stack) , which should happen as WM keeps progressing to the nth element for >> unfulfilled patterns. A "null" ( not a fan ) event is used to establish a >> WM progression >> >> >> In case there is a FinalState ( and we skipToFirstAfterLast ) >> >> * The NFA by will prune ( release ) all partial matches and prune the >> shared buffer and emit the current match. The computations now should be >> empty. >> >> There is a lot to it, but is that roughly what is done in that code ? >> >> >> >> Few questions. >> >> * What we have seen is that the call to toString method of SharedBuffer >> is where OOM occurs. Now in the code there is no call to a Log so we are >> not sure why the method or who calls that method. Surely that is not part >> of the Seriazation/DeSer routine or is it ( very surprising if it is ) >> * There is no out of the box implementation of "m out of n" pattern >> match. We have to resort to n in range ( m * time series slot ) which we >> do. This is fine but what it does not allow is an optimization where if n >> false conditions are seen, one can prune. Simply speaking if n-m false >> have been seen there is no way that out of n there will be ever m trues >> and thus SharedBuffer can be pruned to the last true seen ( very akin to >> skipToFirstAfterLast >> ). >> >> We will keep instrumenting the code ( which apart from the null message >> is easily understandable ) but would love to hear your feedback. >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas < >> k.klou...@data-artisans.com> wrote: >> >>> Thanks a lot Vishal! >>> >>> We are looking forward to a test case that reproduces the failure. >>> >>> Kostas >>> >>> >>> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com> >>> wrote: >>> >>> This is the pattern. Will create a test case. >>> >>> /** >>> * >>> * @param condition a single condition is applied as a acceptance criteria >>> * @param params defining the bounds of the pattern. >>> * @param <U> the element in the stream >>> * @return compiled pattern alonf with the params. >>> */ >>> public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> >>> of(SimpleCondition<U> condition, >>> >>> RelaxedContiguityWithinTime params, >>> >>> RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc, >>> >>> String patternId) { >>> assert (params.seriesLength >= params.elementCount && >>> params.elementCount > 0); >>> Pattern<U, ?> pattern = Pattern. >>> <U>begin(START). >>> where(condition); >>> if (params.elementCount > 1) pattern = pattern. >>> followedBy(REST). >>> where(condition). >>> times(params.elementCount - 1); >>> >>> >>> return new RelaxedContiguousPattern<U>( >>> pattern.within(Time.minutes(params.seriesLength * >>> params.period.duration)) >>> ,params, >>> params.elementCount > 1, >>> params.period.duration, >>> mapFunc, >>> patternId >>> ); >>> } >>> >>> >>> >>> >>> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz < >>> wysakowicz.da...@gmail.com> wrote: >>> >>>> Could you provide some example to reproduce the case? Or the Pattern >>>> that you are using? It would help track down the issue. >>>> >>>> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com> >>>> wrote: >>>> > >>>> > I have pulled in the flink master cep library and the runtime ( the >>>> cluster ) is configured to work against the latest and greatest. This does >>>> not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is >>>> always an issue when it is a larger range ( 20 out of 25 with range of 8 >>>> hours ) . Does that makes sense? >>>> > >>>> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz < >>>> wysakowicz.da...@gmail.com> wrote: >>>> > This problem sounds very similar to this one that was fixed for 1.4.1 >>>> and 1.5.0: >>>> > https://issues.apache.org/jira/browse/FLINK-8226 >>>> > >>>> > Could you check if that helps with your problem too? >>>> > >>>> > > On 1 Feb 2018, at 23:34, Vishal Santoshi <vishal.santo...@gmail.com> >>>> wrote: >>>> > > >>>> > > I have flink master CEP library code imported to a 1.4 build. >>>> > > >>>> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> > > A new one >>>> > > >>>> > > java.lang.OutOfMemoryError: Java heap space >>>> > > at java.util.Arrays.copyOf( >>>> > > Arrays.java:3332) >>>> > > at java.lang. >>>> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringB >>>> uilder.java: >>>> > > 124) >>>> > > at java.lang. >>>> > > AbstractStringBuilder.append(AbstractStringBuilder.java: >>>> > > 448) >>>> > > at java.lang.StringBuilder. >>>> > > append(StringBuilder.java:136) >>>> > > at java.lang.StringBuilder. >>>> > > append(StringBuilder.java:131) >>>> > > at org.apache.commons.lang3. >>>> > > StringUtils.join(StringUtils. >>>> > > java:4106) >>>> > > at org.apache.commons.lang3. >>>> > > StringUtils.join(StringUtils. >>>> > > java:4151) >>>> > > at org.apache.flink.cep.nfa. >>>> > > SharedBuffer$SharedBufferEntry.toString( >>>> > > SharedBuffer.java:624) >>>> > > at java.lang.String.valueOf( >>>> > > String.java:2994) >>>> > > at java.lang.StringBuilder. >>>> > > append(StringBuilder.java:131) >>>> > > at org.apache.flink.cep.nfa. >>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java: >>>> > > 673) >>>> > > at java.lang.String.valueOf( >>>> > > String.java:2994) >>>> > > at java.lang.StringBuilder. >>>> > > append(StringBuilder.java:131) >>>> > > at org.apache.commons.lang3. >>>> > > StringUtils.join(StringUtils. >>>> > > java:4097) >>>> > > at org.apache.commons.lang3. >>>> > > StringUtils.join(StringUtils. >>>> > > java:4151) >>>> > > at org.apache.flink.cep.nfa. >>>> > > SharedBuffer$SharedBufferEntry.toString( >>>> > > SharedBuffer.java:624) >>>> > > at java.lang.String.valueOf( >>>> > > String.java:2994) >>>> > > at java.lang.StringBuilder. >>>> > > append(StringBuilder.java:131) >>>> > > at org.apache.flink.cep.nfa. >>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673) >>>> > > . >>>> > > . >>>> > > . >>>> > > It is the toString() on >>>> > > SharedBuffer >>>> > > no doubt. Some recursive loop ? >>>> > > >>>> > > >>>> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> > > It happens when it looks to throw an exception and calls >>>> shardBuffer.toString. b'coz of the check.... >>>> > > >>>> > > >>>> > > int id = sharedBuffer.entryId; >>>> > > Preconditions.checkState(id != -1, "Could not find id for entry: " >>>> + sharedBuffer); >>>> > > >>>> > > >>>> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> > > The watermark has not moved for this pattern to succeed ( or other >>>> wise ), the issue though is that it is pretty early in the pipe ( like >>>> within a minute ). I am replaying from a kafka topic but the keyed >>>> operator has emitted no more than 1500 plus elements to SelectCEPOperator ( >>>> very visible on the UI ) so am sure not enough elements have been added to >>>> the SharedBuffer to create memory stress. >>>> > > >>>> > > The nature of the input stream is that events are pushed out with a >>>> specific timestamp ( it is a time series and the timestamp if the beginning >>>> of the time slot ) as in one will have a bunch of elements that have a >>>> constant timestamp till the next batch appears. >>>> > > >>>> > > A batch though does not have more than the number of keys elements >>>> ( 600 ). >>>> > > >>>> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> > > This is a pretty simple pattern, as in I hardly have 1500 elements >>>> ( across 600 keys at the max ) put in >>>> > > and though I have a pretty wide range , as in I am looking at a >>>> relaxed pattern ( like 40 true conditions in 6 hours ), >>>> > > I get this. I have the EventTime turned on. >>>> > > >>>> > > >>>> > > java.lang.OutOfMemoryError: Java heap space >>>> > > at java.util.Arrays.copyOf(Arrays >>>> > > .java:3332) >>>> > > at java.lang.AbstractStringBuilde >>>> > > r.ensureCapacityInternal(Abstr >>>> > > actStringBuilder.java:124) >>>> > > at java.lang.AbstractStringBuilde >>>> > > r.append(AbstractStringBuilder >>>> > > .java:448) >>>> > > at java.lang.StringBuilder.append >>>> > > (StringBuilder.java:136) >>>> > > at java.lang.StringBuilder.append >>>> > > (StringBuilder.java:131) >>>> > > at org.apache.commons.lang3.Strin >>>> > > gUtils.join(StringUtils.java:4 >>>> > > 106) >>>> > > at org.apache.commons.lang3.Strin >>>> > > gUtils.join(StringUtils.java:4 >>>> > > 151) >>>> > > at org.apache.flink.cep.nfa.Share >>>> > > dBuffer$SharedBufferEntry.toSt >>>> > > ring(SharedBuffer.java:624) >>>> > > at java.lang.String.valueOf(Strin >>>> > > g.java:2994) >>>> > > at java.lang.StringBuilder.append >>>> > > (StringBuilder.java:131) >>>> > > at org.apache.flink.cep.nfa.Share >>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9 >>>> > > 64) >>>> > > at org.apache.flink.cep.nfa.Share >>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8 >>>> > > 35) >>>> > > at org.apache.flink.cep.nfa.NFA$N >>>> > > FASerializer.serialize(NFA.jav >>>> > > a:888) >>>> > > at org.apache.flink.cep.nfa.NFA$N >>>> > > FASerializer.serialize(NFA.jav >>>> > > a:820) >>>> > > at org.apache.flink.contrib.strea >>>> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100) >>>> > > . >>>> > > . >>>> > > . >>>> > > >>>> > > Any one has seen this issue ? >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > >>>> > >>>> >>>> >>> >>> >> > >