Hello all, There were recent changes to the flink master that I pulled in and that *seems* to have solved our issue.
Few points * CEP is heavy as the NFA transition matrix as state which can be possibly n^2 ( worst case ) can easily blow up space requirements. The after match skip strategy is likely to play a crucial role in keeping the state lean https://ci.apache.org/projects/flink/flink-docs- master/dev/libs/cep.html#after-match-skip-strategy. In our case we do not require partial matches within a match to contribute to another potential match ( noise for us ) and thus *SKIP_PAST_LAST_EVENT *was used which on match will prune the SharedBuffer ( almost reset it ) * The argument that the pattern events should be lean holds much more in CEP due to the potential exponential increase in space requirements. * The nature of the pattern will require consideration if state does blow up for you. Apart from that, I am still not sure why toString() on SharedBuffer was called to get an OOM to begin with. On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > We could not recreate in a controlled setup, but here are a few notes that > we have gathered on a simple "times(n),within(..)" > > In case where the Event does not create a Final or Stop state > > * As an NFA processes an Event, NFA mutates if there is a true Event. Each > computation is a counter that keeps track of partial matches with each true > Event already existent partial match for that computation unit. Essentially > for n Events and if each Event is a true there will be roughly n-1 > computations, each with representing an Event from 1 to n-1 ( so 1 or first > will have n-1 events in the partial match, 2 has n-1 events and so on and > n-1 has the last event as a partial match ). > > * If the WM progresses beyond the ts of the 1st computation, that partial > match is pruned. > > * It makes sure that a SharedBufferEntry is pruned only if the count of > Edges originating from it reduces to 0 ( the internalRemove() which uses a > Stack) , which should happen as WM keeps progressing to the nth element for > unfulfilled patterns. A "null" ( not a fan ) event is used to establish a > WM progression > > > In case there is a FinalState ( and we skipToFirstAfterLast ) > > * The NFA by will prune ( release ) all partial matches and prune the > shared buffer and emit the current match. The computations now should be > empty. > > There is a lot to it, but is that roughly what is done in that code ? > > > > Few questions. > > * What we have seen is that the call to toString method of SharedBuffer is > where OOM occurs. Now in the code there is no call to a Log so we are not > sure why the method or who calls that method. Surely that is not part of > the Seriazation/DeSer routine or is it ( very surprising if it is ) > * There is no out of the box implementation of "m out of n" pattern > match. We have to resort to n in range ( m * time series slot ) which we > do. This is fine but what it does not allow is an optimization where if n > false conditions are seen, one can prune. Simply speaking if n-m false > have been seen there is no way that out of n there will be ever m trues > and thus SharedBuffer can be pruned to the last true seen ( very akin to > skipToFirstAfterLast > ). > > We will keep instrumenting the code ( which apart from the null message is > easily understandable ) but would love to hear your feedback. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas < > k.klou...@data-artisans.com> wrote: > >> Thanks a lot Vishal! >> >> We are looking forward to a test case that reproduces the failure. >> >> Kostas >> >> >> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com> >> wrote: >> >> This is the pattern. Will create a test case. >> >> /** >> * >> * @param condition a single condition is applied as a acceptance criteria >> * @param params defining the bounds of the pattern. >> * @param <U> the element in the stream >> * @return compiled pattern alonf with the params. >> */ >> public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> >> of(SimpleCondition<U> condition, >> >> RelaxedContiguityWithinTime params, >> >> RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc, >> >> String patternId) { >> assert (params.seriesLength >= params.elementCount && >> params.elementCount > 0); >> Pattern<U, ?> pattern = Pattern. >> <U>begin(START). >> where(condition); >> if (params.elementCount > 1) pattern = pattern. >> followedBy(REST). >> where(condition). >> times(params.elementCount - 1); >> >> >> return new RelaxedContiguousPattern<U>( >> pattern.within(Time.minutes(params.seriesLength * >> params.period.duration)) >> ,params, >> params.elementCount > 1, >> params.period.duration, >> mapFunc, >> patternId >> ); >> } >> >> >> >> >> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz < >> wysakowicz.da...@gmail.com> wrote: >> >>> Could you provide some example to reproduce the case? Or the Pattern >>> that you are using? It would help track down the issue. >>> >>> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com> >>> wrote: >>> > >>> > I have pulled in the flink master cep library and the runtime ( the >>> cluster ) is configured to work against the latest and greatest. This does >>> not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is >>> always an issue when it is a larger range ( 20 out of 25 with range of 8 >>> hours ) . Does that makes sense? >>> > >>> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz < >>> wysakowicz.da...@gmail.com> wrote: >>> > This problem sounds very similar to this one that was fixed for 1.4.1 >>> and 1.5.0: >>> > https://issues.apache.org/jira/browse/FLINK-8226 >>> > >>> > Could you check if that helps with your problem too? >>> > >>> > > On 1 Feb 2018, at 23:34, Vishal Santoshi <vishal.santo...@gmail.com> >>> wrote: >>> > > >>> > > I have flink master CEP library code imported to a 1.4 build. >>> > > >>> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> > > A new one >>> > > >>> > > java.lang.OutOfMemoryError: Java heap space >>> > > at java.util.Arrays.copyOf( >>> > > Arrays.java:3332) >>> > > at java.lang. >>> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringB >>> uilder.java: >>> > > 124) >>> > > at java.lang. >>> > > AbstractStringBuilder.append(AbstractStringBuilder.java: >>> > > 448) >>> > > at java.lang.StringBuilder. >>> > > append(StringBuilder.java:136) >>> > > at java.lang.StringBuilder. >>> > > append(StringBuilder.java:131) >>> > > at org.apache.commons.lang3. >>> > > StringUtils.join(StringUtils. >>> > > java:4106) >>> > > at org.apache.commons.lang3. >>> > > StringUtils.join(StringUtils. >>> > > java:4151) >>> > > at org.apache.flink.cep.nfa. >>> > > SharedBuffer$SharedBufferEntry.toString( >>> > > SharedBuffer.java:624) >>> > > at java.lang.String.valueOf( >>> > > String.java:2994) >>> > > at java.lang.StringBuilder. >>> > > append(StringBuilder.java:131) >>> > > at org.apache.flink.cep.nfa. >>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java: >>> > > 673) >>> > > at java.lang.String.valueOf( >>> > > String.java:2994) >>> > > at java.lang.StringBuilder. >>> > > append(StringBuilder.java:131) >>> > > at org.apache.commons.lang3. >>> > > StringUtils.join(StringUtils. >>> > > java:4097) >>> > > at org.apache.commons.lang3. >>> > > StringUtils.join(StringUtils. >>> > > java:4151) >>> > > at org.apache.flink.cep.nfa. >>> > > SharedBuffer$SharedBufferEntry.toString( >>> > > SharedBuffer.java:624) >>> > > at java.lang.String.valueOf( >>> > > String.java:2994) >>> > > at java.lang.StringBuilder. >>> > > append(StringBuilder.java:131) >>> > > at org.apache.flink.cep.nfa. >>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673) >>> > > . >>> > > . >>> > > . >>> > > It is the toString() on >>> > > SharedBuffer >>> > > no doubt. Some recursive loop ? >>> > > >>> > > >>> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> > > It happens when it looks to throw an exception and calls >>> shardBuffer.toString. b'coz of the check.... >>> > > >>> > > >>> > > int id = sharedBuffer.entryId; >>> > > Preconditions.checkState(id != -1, "Could not find id for entry: " + >>> sharedBuffer); >>> > > >>> > > >>> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> > > The watermark has not moved for this pattern to succeed ( or other >>> wise ), the issue though is that it is pretty early in the pipe ( like >>> within a minute ). I am replaying from a kafka topic but the keyed >>> operator has emitted no more than 1500 plus elements to SelectCEPOperator ( >>> very visible on the UI ) so am sure not enough elements have been added to >>> the SharedBuffer to create memory stress. >>> > > >>> > > The nature of the input stream is that events are pushed out with a >>> specific timestamp ( it is a time series and the timestamp if the beginning >>> of the time slot ) as in one will have a bunch of elements that have a >>> constant timestamp till the next batch appears. >>> > > >>> > > A batch though does not have more than the number of keys elements ( >>> 600 ). >>> > > >>> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> > > This is a pretty simple pattern, as in I hardly have 1500 elements ( >>> across 600 keys at the max ) put in >>> > > and though I have a pretty wide range , as in I am looking at a >>> relaxed pattern ( like 40 true conditions in 6 hours ), >>> > > I get this. I have the EventTime turned on. >>> > > >>> > > >>> > > java.lang.OutOfMemoryError: Java heap space >>> > > at java.util.Arrays.copyOf(Arrays >>> > > .java:3332) >>> > > at java.lang.AbstractStringBuilde >>> > > r.ensureCapacityInternal(Abstr >>> > > actStringBuilder.java:124) >>> > > at java.lang.AbstractStringBuilde >>> > > r.append(AbstractStringBuilder >>> > > .java:448) >>> > > at java.lang.StringBuilder.append >>> > > (StringBuilder.java:136) >>> > > at java.lang.StringBuilder.append >>> > > (StringBuilder.java:131) >>> > > at org.apache.commons.lang3.Strin >>> > > gUtils.join(StringUtils.java:4 >>> > > 106) >>> > > at org.apache.commons.lang3.Strin >>> > > gUtils.join(StringUtils.java:4 >>> > > 151) >>> > > at org.apache.flink.cep.nfa.Share >>> > > dBuffer$SharedBufferEntry.toSt >>> > > ring(SharedBuffer.java:624) >>> > > at java.lang.String.valueOf(Strin >>> > > g.java:2994) >>> > > at java.lang.StringBuilder.append >>> > > (StringBuilder.java:131) >>> > > at org.apache.flink.cep.nfa.Share >>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9 >>> > > 64) >>> > > at org.apache.flink.cep.nfa.Share >>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8 >>> > > 35) >>> > > at org.apache.flink.cep.nfa.NFA$N >>> > > FASerializer.serialize(NFA.jav >>> > > a:888) >>> > > at org.apache.flink.cep.nfa.NFA$N >>> > > FASerializer.serialize(NFA.jav >>> > > a:820) >>> > > at org.apache.flink.contrib.strea >>> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100) >>> > > . >>> > > . >>> > > . >>> > > >>> > > Any one has seen this issue ? >>> > > >>> > > >>> > > >>> > > >>> > > >>> > >>> > >>> >>> >> >> >