Will do. On Wed, Mar 7, 2018 at 9:33 AM, Kostas Kloudas <k.klou...@data-artisans.com> wrote:
> Why not opening a JIRA and working on adding some debug > statements that you consider useful? > > This could help the next user that faces the same issues ;) > > Kostas > > On Mar 7, 2018, at 3:29 PM, Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > > Aah, yes we never had a sink state so never came across a case where it > was ever exercised. When the range expires, it is a prune rather than a > stop state ( we were expecting it to be a stop state ) which is some what > misleading if we hold stop state to " that invalidates a partial match " > whatever the reason may be. > > Again I would also advise ( though not a biggy ) that strategic debug > statements in the CEP core would help folks to see what actually happens. > We instrumented the code to follow the construction of NFA that was very > helpful. > > On Wed, Mar 7, 2018 at 9:23 AM, Kostas Kloudas < > k.klou...@data-artisans.com> wrote: > >> Hi Vishal, >> >> A stopState is a state that invalidates a partial match, e.g. >> a.NotFollowedBy(b).followedBy(c). >> If you have an “a” and then you see a “b” then you invalidate the pattern. >> >> A finalState is the one where a match has been found. >> >> Kostas >> >> >> On Mar 7, 2018, at 3:20 PM, Vishal Santoshi <vishal.santo...@gmail.com> >> wrote: >> >> Absolutely. For one a simple m out of n true conditions where n is >> defined by range is a little under optimized as in just using time(m) will >> not short circuit the partial patterns till the time range is achieved even >> if there is no way m true conditions can be achieved ( we already have had >> n-m false conditions ) . That makes sense as we have defined a within() >> condition predicated on n. >> >> I think the way one would do it is to iterative condition and look at >> all events ( including the ones with false but that can be expensive ) >> and stop a pattern. One question I had is that an NFA can be in a >> FinalState or a StopState. >> >> What would constitute a StopState ? >> >> On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas < >> k.klou...@data-artisans.com> wrote: >> >>> Hi Vishal, >>> >>> Thanks a lot for sharing your experience and the potential caveats to >>> consider when >>> specifying your pattern. >>> >>> I agree that there is room for improvement when it comes to the state >>> checkpointed in Flink. >>> We already have some ideas but still, as you also said, the bulk of the >>> space consumption >>> comes from the pattern definition, so it could be nice if more people >>> did the same, i.e. sharing >>> their experience, and why not, compiling a guide of things to avoid and >>> put it along the rest >>> of FlinkCEP documentation. >>> >>> What do you think? >>> >>> Kostas >>> >>> >>> >>> On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <vishal.santo...@gmail.com> >>> wrote: >>> >>> Hello all, There were recent changes to the flink master that I pulled >>> in and that *seems* to have solved our issue. >>> >>> Few points >>> >>> * CEP is heavy as the NFA transition matrix as state which can be >>> possibly n^2 ( worst case ) can easily blow up space requirements. The >>> after match skip strategy is likely to play a crucial role in keeping the >>> state lean https://ci.apache.org/projects/flink/flink-docs-master/ >>> dev/libs/cep.html#after-match-skip-strategy. In our case we do not >>> require partial matches within a match to contribute to another potential >>> match ( noise for us ) and thus *SKIP_PAST_LAST_EVENT *was used which >>> on match will prune the SharedBuffer ( almost reset it ) >>> >>> * The argument that the pattern events should be lean holds much more in >>> CEP due to the potential exponential increase in space requirements. >>> >>> * The nature of the pattern will require consideration if state does >>> blow up for you. >>> >>> Apart from that, I am still not sure why toString() on SharedBuffer was >>> called to get an OOM to begin with. >>> >>> >>> >>> On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> We could not recreate in a controlled setup, but here are a few notes >>>> that we have gathered on a simple "times(n),within(..)" >>>> >>>> In case where the Event does not create a Final or Stop state >>>> >>>> * As an NFA processes an Event, NFA mutates if there is a true Event. >>>> Each computation is a counter that keeps track of partial matches with each >>>> true Event already existent partial match for that computation unit. >>>> Essentially for n Events and if each Event is a true there will be roughly >>>> n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or >>>> first will have n-1 events in the partial match, 2 has n-1 events and so on >>>> and n-1 has the last event as a partial match ). >>>> >>>> * If the WM progresses beyond the ts of the 1st computation, that >>>> partial match is pruned. >>>> >>>> * It makes sure that a SharedBufferEntry is pruned only if the count of >>>> Edges originating from it reduces to 0 ( the internalRemove() which uses a >>>> Stack) , which should happen as WM keeps progressing to the nth element for >>>> unfulfilled patterns. A "null" ( not a fan ) event is used to establish a >>>> WM progression >>>> >>>> >>>> In case there is a FinalState ( and we skipToFirstAfterLast ) >>>> >>>> * The NFA by will prune ( release ) all partial matches and prune the >>>> shared buffer and emit the current match. The computations now should >>>> be empty. >>>> >>>> There is a lot to it, but is that roughly what is done in that code ? >>>> >>>> >>>> >>>> Few questions. >>>> >>>> * What we have seen is that the call to toString method of SharedBuffer >>>> is where OOM occurs. Now in the code there is no call to a Log so we are >>>> not sure why the method or who calls that method. Surely that is not part >>>> of the Seriazation/DeSer routine or is it ( very surprising if it is ) >>>> * There is no out of the box implementation of "m out of n" pattern >>>> match. We have to resort to n in range ( m * time series slot ) which we >>>> do. This is fine but what it does not allow is an optimization where if n >>>> false conditions are seen, one can prune. Simply speaking if n-m false >>>> have been seen there is no way that out of n there will be ever m trues >>>> and thus SharedBuffer can be pruned to the last true seen ( very akin to >>>> skipToFirstAfterLast >>>> ). >>>> >>>> We will keep instrumenting the code ( which apart from the null message >>>> is easily understandable ) but would love to hear your feedback. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas < >>>> k.klou...@data-artisans.com> wrote: >>>> >>>>> Thanks a lot Vishal! >>>>> >>>>> We are looking forward to a test case that reproduces the failure. >>>>> >>>>> Kostas >>>>> >>>>> >>>>> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com> >>>>> wrote: >>>>> >>>>> This is the pattern. Will create a test case. >>>>> >>>>> /** >>>>> * >>>>> * @param condition a single condition is applied as a acceptance >>>>> criteria >>>>> * @param params defining the bounds of the pattern. >>>>> * @param <U> the element in the stream >>>>> * @return compiled pattern alonf with the params. >>>>> */ >>>>> public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> >>>>> of(SimpleCondition<U> condition, >>>>> >>>>> RelaxedContiguityWithinTime params, >>>>> >>>>> RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc, >>>>> >>>>> String patternId) { >>>>> assert (params.seriesLength >= params.elementCount && >>>>> params.elementCount > 0); >>>>> Pattern<U, ?> pattern = Pattern. >>>>> <U>begin(START). >>>>> where(condition); >>>>> if (params.elementCount > 1) pattern = pattern. >>>>> followedBy(REST). >>>>> where(condition). >>>>> times(params.elementCount - 1); >>>>> >>>>> >>>>> return new RelaxedContiguousPattern<U>( >>>>> pattern.within(Time.minutes(params.seriesLength * >>>>> params.period.duration)) >>>>> ,params, >>>>> params.elementCount > 1, >>>>> params.period.duration, >>>>> mapFunc, >>>>> patternId >>>>> ); >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz < >>>>> wysakowicz.da...@gmail.com> wrote: >>>>> >>>>>> Could you provide some example to reproduce the case? Or the Pattern >>>>>> that you are using? It would help track down the issue. >>>>>> >>>>>> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com> >>>>>> wrote: >>>>>> > >>>>>> > I have pulled in the flink master cep library and the runtime ( the >>>>>> cluster ) is configured to work against the latest and greatest. This >>>>>> does >>>>>> not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is >>>>>> always an issue when it is a larger range ( 20 out of 25 with range of 8 >>>>>> hours ) . Does that makes sense? >>>>>> > >>>>>> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz < >>>>>> wysakowicz.da...@gmail.com> wrote: >>>>>> > This problem sounds very similar to this one that was fixed for >>>>>> 1.4.1 and 1.5.0: >>>>>> > https://issues.apache.org/jira/browse/FLINK-8226 >>>>>> > >>>>>> > Could you check if that helps with your problem too? >>>>>> > >>>>>> > > On 1 Feb 2018, at 23:34, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> > > >>>>>> > > I have flink master CEP library code imported to a 1.4 build. >>>>>> > > >>>>>> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> > > A new one >>>>>> > > >>>>>> > > java.lang.OutOfMemoryError: Java heap space >>>>>> > > at java.util.Arrays.copyOf( >>>>>> > > Arrays.java:3332) >>>>>> > > at java.lang. >>>>>> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringB >>>>>> uilder.java: >>>>>> > > 124) >>>>>> > > at java.lang. >>>>>> > > AbstractStringBuilder.append(AbstractStringBuilder.java: >>>>>> > > 448) >>>>>> > > at java.lang.StringBuilder. >>>>>> > > append(StringBuilder.java:136) >>>>>> > > at java.lang.StringBuilder. >>>>>> > > append(StringBuilder.java:131) >>>>>> > > at org.apache.commons.lang3. >>>>>> > > StringUtils.join(StringUtils. >>>>>> > > java:4106) >>>>>> > > at org.apache.commons.lang3. >>>>>> > > StringUtils.join(StringUtils. >>>>>> > > java:4151) >>>>>> > > at org.apache.flink.cep.nfa. >>>>>> > > SharedBuffer$SharedBufferEntry.toString( >>>>>> > > SharedBuffer.java:624) >>>>>> > > at java.lang.String.valueOf( >>>>>> > > String.java:2994) >>>>>> > > at java.lang.StringBuilder. >>>>>> > > append(StringBuilder.java:131) >>>>>> > > at org.apache.flink.cep.nfa. >>>>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java: >>>>>> > > 673) >>>>>> > > at java.lang.String.valueOf( >>>>>> > > String.java:2994) >>>>>> > > at java.lang.StringBuilder. >>>>>> > > append(StringBuilder.java:131) >>>>>> > > at org.apache.commons.lang3. >>>>>> > > StringUtils.join(StringUtils. >>>>>> > > java:4097) >>>>>> > > at org.apache.commons.lang3. >>>>>> > > StringUtils.join(StringUtils. >>>>>> > > java:4151) >>>>>> > > at org.apache.flink.cep.nfa. >>>>>> > > SharedBuffer$SharedBufferEntry.toString( >>>>>> > > SharedBuffer.java:624) >>>>>> > > at java.lang.String.valueOf( >>>>>> > > String.java:2994) >>>>>> > > at java.lang.StringBuilder. >>>>>> > > append(StringBuilder.java:131) >>>>>> > > at org.apache.flink.cep.nfa. >>>>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673) >>>>>> > > . >>>>>> > > . >>>>>> > > . >>>>>> > > It is the toString() on >>>>>> > > SharedBuffer >>>>>> > > no doubt. Some recursive loop ? >>>>>> > > >>>>>> > > >>>>>> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> > > It happens when it looks to throw an exception and calls >>>>>> shardBuffer.toString. b'coz of the check.... >>>>>> > > >>>>>> > > >>>>>> > > int id = sharedBuffer.entryId; >>>>>> > > Preconditions.checkState(id != -1, "Could not find id for entry: >>>>>> " + sharedBuffer); >>>>>> > > >>>>>> > > >>>>>> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> > > The watermark has not moved for this pattern to succeed ( or >>>>>> other wise ), the issue though is that it is pretty early in the pipe ( >>>>>> like within a minute ). I am replaying from a kafka topic but the keyed >>>>>> operator has emitted no more than 1500 plus elements to >>>>>> SelectCEPOperator ( >>>>>> very visible on the UI ) so am sure not enough elements have been added >>>>>> to >>>>>> the SharedBuffer to create memory stress. >>>>>> > > >>>>>> > > The nature of the input stream is that events are pushed out with >>>>>> a specific timestamp ( it is a time series and the timestamp if the >>>>>> beginning of the time slot ) as in one will have a bunch of elements >>>>>> that >>>>>> have a constant timestamp till the next batch appears. >>>>>> > > >>>>>> > > A batch though does not have more than the number of keys >>>>>> elements ( 600 ). >>>>>> > > >>>>>> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> > > This is a pretty simple pattern, as in I hardly have 1500 >>>>>> elements ( across 600 keys at the max ) put in >>>>>> > > and though I have a pretty wide range , as in I am looking at a >>>>>> relaxed pattern ( like 40 true conditions in 6 hours ), >>>>>> > > I get this. I have the EventTime turned on. >>>>>> > > >>>>>> > > >>>>>> > > java.lang.OutOfMemoryError: Java heap space >>>>>> > > at java.util.Arrays.copyOf(Arrays >>>>>> > > .java:3332) >>>>>> > > at java.lang.AbstractStringBuilde >>>>>> > > r.ensureCapacityInternal(Abstr >>>>>> > > actStringBuilder.java:124) >>>>>> > > at java.lang.AbstractStringBuilde >>>>>> > > r.append(AbstractStringBuilder >>>>>> > > .java:448) >>>>>> > > at java.lang.StringBuilder.append >>>>>> > > (StringBuilder.java:136) >>>>>> > > at java.lang.StringBuilder.append >>>>>> > > (StringBuilder.java:131) >>>>>> > > at org.apache.commons.lang3.Strin >>>>>> > > gUtils.join(StringUtils.java:4 >>>>>> > > 106) >>>>>> > > at org.apache.commons.lang3.Strin >>>>>> > > gUtils.join(StringUtils.java:4 >>>>>> > > 151) >>>>>> > > at org.apache.flink.cep.nfa.Share >>>>>> > > dBuffer$SharedBufferEntry.toSt >>>>>> > > ring(SharedBuffer.java:624) >>>>>> > > at java.lang.String.valueOf(Strin >>>>>> > > g.java:2994) >>>>>> > > at java.lang.StringBuilder.append >>>>>> > > (StringBuilder.java:131) >>>>>> > > at org.apache.flink.cep.nfa.Share >>>>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9 >>>>>> > > 64) >>>>>> > > at org.apache.flink.cep.nfa.Share >>>>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8 >>>>>> > > 35) >>>>>> > > at org.apache.flink.cep.nfa.NFA$N >>>>>> > > FASerializer.serialize(NFA.jav >>>>>> > > a:888) >>>>>> > > at org.apache.flink.cep.nfa.NFA$N >>>>>> > > FASerializer.serialize(NFA.jav >>>>>> > > a:820) >>>>>> > > at org.apache.flink.contrib.strea >>>>>> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100) >>>>>> > > . >>>>>> > > . >>>>>> > > . >>>>>> > > >>>>>> > > Any one has seen this issue ? >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > >>>>>> > >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >> >> > >