Thanks a lot Vishal! We are looking forward to a test case that reproduces the failure.
Kostas > On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > > This is the pattern. Will create a test case. > /** > * > * @param condition a single condition is applied as a acceptance criteria > * @param params defining the bounds of the pattern. > * @param <U> the element in the stream > * @return compiled pattern alonf with the params. > */ > public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> > of(SimpleCondition<U> condition, > > RelaxedContiguityWithinTime params, > > RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc, > > String patternId) { > assert (params.seriesLength >= params.elementCount && params.elementCount > > 0); > Pattern<U, ?> pattern = Pattern. > <U>begin(START). > where(condition); > if (params.elementCount > 1) pattern = pattern. > followedBy(REST). > where(condition). > times(params.elementCount - 1); > > return new RelaxedContiguousPattern<U>( > pattern.within(Time.minutes(params.seriesLength * > params.period.duration)) > ,params, > params.elementCount > 1, > params.period.duration, > mapFunc, > patternId > ); > } > > > > On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <wysakowicz.da...@gmail.com > <mailto:wysakowicz.da...@gmail.com>> wrote: > Could you provide some example to reproduce the case? Or the Pattern that you > are using? It would help track down the issue. > > > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com > > <mailto:vishal.santo...@gmail.com>> wrote: > > > > I have pulled in the flink master cep library and the runtime ( the cluster > > ) is configured to work against the latest and greatest. This does not > > happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always > > an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . > > Does that makes sense? > > > > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz > > <wysakowicz.da...@gmail.com <mailto:wysakowicz.da...@gmail.com>> wrote: > > This problem sounds very similar to this one that was fixed for 1.4.1 and > > 1.5.0: > > https://issues.apache.org/jira/browse/FLINK-8226 > > <https://issues.apache.org/jira/browse/FLINK-8226> > > > > Could you check if that helps with your problem too? > > > > > On 1 Feb 2018, at 23:34, Vishal Santoshi <vishal.santo...@gmail.com > > > <mailto:vishal.santo...@gmail.com>> wrote: > > > > > > I have flink master CEP library code imported to a 1.4 build. > > > > > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi > > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: > > > A new one > > > > > > java.lang.OutOfMemoryError: Java heap space > > > at java.util.Arrays.copyOf( > > > Arrays.java:3332) > > > at java.lang. > > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java: > > > 124) > > > at java.lang. > > > AbstractStringBuilder.append(AbstractStringBuilder.java: > > > 448) > > > at java.lang.StringBuilder. > > > append(StringBuilder.java:136) > > > at java.lang.StringBuilder. > > > append(StringBuilder.java:131) > > > at org.apache.commons.lang3. > > > StringUtils.join(StringUtils. > > > java:4106) > > > at org.apache.commons.lang3. > > > StringUtils.join(StringUtils. > > > java:4151) > > > at org.apache.flink.cep.nfa. > > > SharedBuffer$SharedBufferEntry.toString( > > > SharedBuffer.java:624) > > > at java.lang.String.valueOf( > > > String.java:2994) > > > at java.lang.StringBuilder. > > > append(StringBuilder.java:131) > > > at org.apache.flink.cep.nfa. > > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java: > > > 673) > > > at java.lang.String.valueOf( > > > String.java:2994) > > > at java.lang.StringBuilder. > > > append(StringBuilder.java:131) > > > at org.apache.commons.lang3. > > > StringUtils.join(StringUtils. > > > java:4097) > > > at org.apache.commons.lang3. > > > StringUtils.join(StringUtils. > > > java:4151) > > > at org.apache.flink.cep.nfa. > > > SharedBuffer$SharedBufferEntry.toString( > > > SharedBuffer.java:624) > > > at java.lang.String.valueOf( > > > String.java:2994) > > > at java.lang.StringBuilder. > > > append(StringBuilder.java:131) > > > at org.apache.flink.cep.nfa. > > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673) > > > . > > > . > > > . > > > It is the toString() on > > > SharedBuffer > > > no doubt. Some recursive loop ? > > > > > > > > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi > > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: > > > It happens when it looks to throw an exception and calls > > > shardBuffer.toString. b'coz of the check.... > > > > > > > > > int id = sharedBuffer.entryId; > > > Preconditions.checkState(id != -1, "Could not find id for entry: " + > > > sharedBuffer); > > > > > > > > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi > > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: > > > The watermark has not moved for this pattern to succeed ( or other wise > > > ), the issue though is that it is pretty early in the pipe ( like within > > > a minute ). I am replaying from a kafka topic but the keyed operator has > > > emitted no more than 1500 plus elements to SelectCEPOperator ( very > > > visible on the UI ) so am sure not enough elements have been added to the > > > SharedBuffer to create memory stress. > > > > > > The nature of the input stream is that events are pushed out with a > > > specific timestamp ( it is a time series and the timestamp if the > > > beginning of the time slot ) as in one will have a bunch of elements > > > that have a constant timestamp till the next batch appears. > > > > > > A batch though does not have more than the number of keys elements ( 600 > > > ). > > > > > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi > > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: > > > This is a pretty simple pattern, as in I hardly have 1500 elements ( > > > across 600 keys at the max ) put in > > > and though I have a pretty wide range , as in I am looking at a relaxed > > > pattern ( like 40 true conditions in 6 hours ), > > > I get this. I have the EventTime turned on. > > > > > > > > > java.lang.OutOfMemoryError: Java heap space > > > at java.util.Arrays.copyOf(Arrays > > > .java:3332) > > > at java.lang.AbstractStringBuilde > > > r.ensureCapacityInternal(Abstr > > > actStringBuilder.java:124) > > > at java.lang.AbstractStringBuilde > > > r.append(AbstractStringBuilder > > > .java:448) > > > at java.lang.StringBuilder.append > > > (StringBuilder.java:136) > > > at java.lang.StringBuilder.append > > > (StringBuilder.java:131) > > > at org.apache.commons.lang3.Strin > > > gUtils.join(StringUtils.java:4 > > > 106) > > > at org.apache.commons.lang3.Strin > > > gUtils.join(StringUtils.java:4 > > > 151) > > > at org.apache.flink.cep.nfa.Share > > > dBuffer$SharedBufferEntry.toSt > > > ring(SharedBuffer.java:624) > > > at java.lang.String.valueOf(Strin > > > g.java:2994) > > > at java.lang.StringBuilder.append > > > (StringBuilder.java:131) > > > at org.apache.flink.cep.nfa.Share > > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9 > > > 64) > > > at org.apache.flink.cep.nfa.Share > > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8 > > > 35) > > > at org.apache.flink.cep.nfa.NFA$N > > > FASerializer.serialize(NFA.jav > > > a:888) > > > at org.apache.flink.cep.nfa.NFA$N > > > FASerializer.serialize(NFA.jav > > > a:820) > > > at org.apache.flink.contrib.strea > > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100) > > > . > > > . > > > . > > > > > > Any one has seen this issue ? > > > > > > > > > > > > > > > > > > > > >