We could not recreate in a controlled setup, but here are a few notes that we have gathered on a simple "times(n),within(..)"
In case where the Event does not create a Final or Stop state * As an NFA processes an Event, NFA mutates if there is a true Event. Each computation is a counter that keeps track of partial matches with each true Event already existent partial match for that computation unit. Essentially for n Events and if each Event is a true there will be roughly n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or first will have n-1 events in the partial match, 2 has n-1 events and so on and n-1 has the last event as a partial match ). * If the WM progresses beyond the ts of the 1st computation, that partial match is pruned. * It makes sure that a SharedBufferEntry is pruned only if the count of Edges originating from it reduces to 0 ( the internalRemove() which uses a Stack) , which should happen as WM keeps progressing to the nth element for unfulfilled patterns. A "null" ( not a fan ) event is used to establish a WM progression In case there is a FinalState ( and we skipToFirstAfterLast ) * The NFA by will prune ( release ) all partial matches and prune the shared buffer and emit the current match. The computations now should be empty. There is a lot to it, but is that roughly what is done in that code ? Few questions. * What we have seen is that the call to toString method of SharedBuffer is where OOM occurs. Now in the code there is no call to a Log so we are not sure why the method or who calls that method. Surely that is not part of the Seriazation/DeSer routine or is it ( very surprising if it is ) * There is no out of the box implementation of "m out of n" pattern match. We have to resort to n in range ( m * time series slot ) which we do. This is fine but what it does not allow is an optimization where if n false conditions are seen, one can prune. Simply speaking if n-m false have been seen there is no way that out of n there will be ever m trues and thus SharedBuffer can be pruned to the last true seen ( very akin to skipToFirstAfterLast ). We will keep instrumenting the code ( which apart from the null message is easily understandable ) but would love to hear your feedback. On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <k.klou...@data-artisans.com > wrote: > Thanks a lot Vishal! > > We are looking forward to a test case that reproduces the failure. > > Kostas > > > On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > > This is the pattern. Will create a test case. > > /** > * > * @param condition a single condition is applied as a acceptance criteria > * @param params defining the bounds of the pattern. > * @param <U> the element in the stream > * @return compiled pattern alonf with the params. > */ > public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> > of(SimpleCondition<U> condition, > > RelaxedContiguityWithinTime params, > > RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc, > > String patternId) { > assert (params.seriesLength >= params.elementCount && params.elementCount > > 0); > Pattern<U, ?> pattern = Pattern. > <U>begin(START). > where(condition); > if (params.elementCount > 1) pattern = pattern. > followedBy(REST). > where(condition). > times(params.elementCount - 1); > > > return new RelaxedContiguousPattern<U>( > pattern.within(Time.minutes(params.seriesLength * > params.period.duration)) > ,params, > params.elementCount > 1, > params.period.duration, > mapFunc, > patternId > ); > } > > > > > On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz < > wysakowicz.da...@gmail.com> wrote: > >> Could you provide some example to reproduce the case? Or the Pattern that >> you are using? It would help track down the issue. >> >> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com> >> wrote: >> > >> > I have pulled in the flink master cep library and the runtime ( the >> cluster ) is configured to work against the latest and greatest. This does >> not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is >> always an issue when it is a larger range ( 20 out of 25 with range of 8 >> hours ) . Does that makes sense? >> > >> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz < >> wysakowicz.da...@gmail.com> wrote: >> > This problem sounds very similar to this one that was fixed for 1.4.1 >> and 1.5.0: >> > https://issues.apache.org/jira/browse/FLINK-8226 >> > >> > Could you check if that helps with your problem too? >> > >> > > On 1 Feb 2018, at 23:34, Vishal Santoshi <vishal.santo...@gmail.com> >> wrote: >> > > >> > > I have flink master CEP library code imported to a 1.4 build. >> > > >> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> > > A new one >> > > >> > > java.lang.OutOfMemoryError: Java heap space >> > > at java.util.Arrays.copyOf( >> > > Arrays.java:3332) >> > > at java.lang. >> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringB >> uilder.java: >> > > 124) >> > > at java.lang. >> > > AbstractStringBuilder.append(AbstractStringBuilder.java: >> > > 448) >> > > at java.lang.StringBuilder. >> > > append(StringBuilder.java:136) >> > > at java.lang.StringBuilder. >> > > append(StringBuilder.java:131) >> > > at org.apache.commons.lang3. >> > > StringUtils.join(StringUtils. >> > > java:4106) >> > > at org.apache.commons.lang3. >> > > StringUtils.join(StringUtils. >> > > java:4151) >> > > at org.apache.flink.cep.nfa. >> > > SharedBuffer$SharedBufferEntry.toString( >> > > SharedBuffer.java:624) >> > > at java.lang.String.valueOf( >> > > String.java:2994) >> > > at java.lang.StringBuilder. >> > > append(StringBuilder.java:131) >> > > at org.apache.flink.cep.nfa. >> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java: >> > > 673) >> > > at java.lang.String.valueOf( >> > > String.java:2994) >> > > at java.lang.StringBuilder. >> > > append(StringBuilder.java:131) >> > > at org.apache.commons.lang3. >> > > StringUtils.join(StringUtils. >> > > java:4097) >> > > at org.apache.commons.lang3. >> > > StringUtils.join(StringUtils. >> > > java:4151) >> > > at org.apache.flink.cep.nfa. >> > > SharedBuffer$SharedBufferEntry.toString( >> > > SharedBuffer.java:624) >> > > at java.lang.String.valueOf( >> > > String.java:2994) >> > > at java.lang.StringBuilder. >> > > append(StringBuilder.java:131) >> > > at org.apache.flink.cep.nfa. >> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673) >> > > . >> > > . >> > > . >> > > It is the toString() on >> > > SharedBuffer >> > > no doubt. Some recursive loop ? >> > > >> > > >> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> > > It happens when it looks to throw an exception and calls >> shardBuffer.toString. b'coz of the check.... >> > > >> > > >> > > int id = sharedBuffer.entryId; >> > > Preconditions.checkState(id != -1, "Could not find id for entry: " + >> sharedBuffer); >> > > >> > > >> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> > > The watermark has not moved for this pattern to succeed ( or other >> wise ), the issue though is that it is pretty early in the pipe ( like >> within a minute ). I am replaying from a kafka topic but the keyed >> operator has emitted no more than 1500 plus elements to SelectCEPOperator ( >> very visible on the UI ) so am sure not enough elements have been added to >> the SharedBuffer to create memory stress. >> > > >> > > The nature of the input stream is that events are pushed out with a >> specific timestamp ( it is a time series and the timestamp if the beginning >> of the time slot ) as in one will have a bunch of elements that have a >> constant timestamp till the next batch appears. >> > > >> > > A batch though does not have more than the number of keys elements ( >> 600 ). >> > > >> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> > > This is a pretty simple pattern, as in I hardly have 1500 elements ( >> across 600 keys at the max ) put in >> > > and though I have a pretty wide range , as in I am looking at a >> relaxed pattern ( like 40 true conditions in 6 hours ), >> > > I get this. I have the EventTime turned on. >> > > >> > > >> > > java.lang.OutOfMemoryError: Java heap space >> > > at java.util.Arrays.copyOf(Arrays >> > > .java:3332) >> > > at java.lang.AbstractStringBuilde >> > > r.ensureCapacityInternal(Abstr >> > > actStringBuilder.java:124) >> > > at java.lang.AbstractStringBuilde >> > > r.append(AbstractStringBuilder >> > > .java:448) >> > > at java.lang.StringBuilder.append >> > > (StringBuilder.java:136) >> > > at java.lang.StringBuilder.append >> > > (StringBuilder.java:131) >> > > at org.apache.commons.lang3.Strin >> > > gUtils.join(StringUtils.java:4 >> > > 106) >> > > at org.apache.commons.lang3.Strin >> > > gUtils.join(StringUtils.java:4 >> > > 151) >> > > at org.apache.flink.cep.nfa.Share >> > > dBuffer$SharedBufferEntry.toSt >> > > ring(SharedBuffer.java:624) >> > > at java.lang.String.valueOf(Strin >> > > g.java:2994) >> > > at java.lang.StringBuilder.append >> > > (StringBuilder.java:131) >> > > at org.apache.flink.cep.nfa.Share >> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9 >> > > 64) >> > > at org.apache.flink.cep.nfa.Share >> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8 >> > > 35) >> > > at org.apache.flink.cep.nfa.NFA$N >> > > FASerializer.serialize(NFA.jav >> > > a:888) >> > > at org.apache.flink.cep.nfa.NFA$N >> > > FASerializer.serialize(NFA.jav >> > > a:820) >> > > at org.apache.flink.contrib.strea >> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100) >> > > . >> > > . >> > > . >> > > >> > > Any one has seen this issue ? >> > > >> > > >> > > >> > > >> > > >> > >> > >> >> > >