This is the pattern. Will create a test case.

/**
 *
 * @param condition a single condition is applied as a  acceptance criteria
 * @param params defining the bounds of the pattern.
 * @param <U> the element in the stream
 * @return compiled pattern alonf with the params.
 */
public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U>
of(SimpleCondition<U> condition,

   RelaxedContiguityWithinTime params,

   RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>>
mapFunc,

   String patternId) {
    assert (params.seriesLength >= params.elementCount &&
params.elementCount > 0);
    Pattern<U, ?> pattern = Pattern.
            <U>begin(START).
            where(condition);
    if (params.elementCount > 1) pattern = pattern.
            followedBy(REST).
            where(condition).
            times(params.elementCount - 1);


    return new RelaxedContiguousPattern<U>(
            pattern.within(Time.minutes(params.seriesLength *
params.period.duration))
            ,params,
            params.elementCount > 1,
            params.period.duration,
            mapFunc,
            patternId
    );
}




On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <wysakowicz.da...@gmail.com
> wrote:

> Could you provide some example to reproduce the case? Or the Pattern that
> you are using? It would help track down the issue.
>
> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
> >
> > I have pulled in the flink master cep library and the runtime ( the
> cluster ) is configured to work against the latest and greatest. This does
> not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is
> always an issue when it is a larger range ( 20 out of 25 with range of 8
> hours ) . Does that makes sense?
> >
> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <
> wysakowicz.da...@gmail.com> wrote:
> > This problem sounds very similar to this one that was fixed for 1.4.1
> and 1.5.0:
> > https://issues.apache.org/jira/browse/FLINK-8226
> >
> > Could you check if that helps with your problem too?
> >
> > > On 1 Feb 2018, at 23:34, Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
> > >
> > > I have flink master CEP library code imported to  a 1.4 build.
> > >
> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> > > A new one
> > >
> > > java.lang.OutOfMemoryError: Java heap space
> > >       at java.util.Arrays.copyOf(
> > > Arrays.java:3332)
> > >       at java.lang.
> > > AbstractStringBuilder.ensureCapacityInternal(
> AbstractStringBuilder.java:
> > > 124)
> > >       at java.lang.
> > > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > > 448)
> > >       at java.lang.StringBuilder.
> > > append(StringBuilder.java:136)
> > >       at java.lang.StringBuilder.
> > > append(StringBuilder.java:131)
> > >       at org.apache.commons.lang3.
> > > StringUtils.join(StringUtils.
> > > java:4106)
> > >       at org.apache.commons.lang3.
> > > StringUtils.join(StringUtils.
> > > java:4151)
> > >       at org.apache.flink.cep.nfa.
> > > SharedBuffer$SharedBufferEntry.toString(
> > > SharedBuffer.java:624)
> > >       at java.lang.String.valueOf(
> > > String.java:2994)
> > >       at java.lang.StringBuilder.
> > > append(StringBuilder.java:131)
> > >       at org.apache.flink.cep.nfa.
> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > > 673)
> > >       at java.lang.String.valueOf(
> > > String.java:2994)
> > >       at java.lang.StringBuilder.
> > > append(StringBuilder.java:131)
> > >       at org.apache.commons.lang3.
> > > StringUtils.join(StringUtils.
> > > java:4097)
> > >       at org.apache.commons.lang3.
> > > StringUtils.join(StringUtils.
> > > java:4151)
> > >       at org.apache.flink.cep.nfa.
> > > SharedBuffer$SharedBufferEntry.toString(
> > > SharedBuffer.java:624)
> > >       at java.lang.String.valueOf(
> > > String.java:2994)
> > >       at java.lang.StringBuilder.
> > > append(StringBuilder.java:131)
> > >       at org.apache.flink.cep.nfa.
> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > > .
> > > .
> > > .
> > > It is the toString() on
> > > SharedBuffer
> > > no doubt. Some recursive loop ?
> > >
> > >
> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> > > It happens when it looks to throw an exception and calls
> shardBuffer.toString. b'coz of the check....
> > >
> > >
> > > int id = sharedBuffer.entryId;
> > > Preconditions.checkState(id != -1, "Could not find id for entry: " +
> sharedBuffer);
> > >
> > >
> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> > > The watermark has not moved for this pattern to succeed ( or other
> wise ), the issue though is that it is pretty early in the pipe ( like
> within a minute ).  I am replaying from a kafka topic but the keyed
> operator has emitted no more than 1500 plus elements to SelectCEPOperator (
> very visible on the UI ) so am sure not enough elements have been added to
> the SharedBuffer to create memory stress.
> > >
> > > The nature of the input stream is that events are pushed out with a
> specific timestamp ( it is a time series and the timestamp if the beginning
> of the time slot )  as in one will have a bunch of elements that have a
> constant timestamp till the next batch appears.
> > >
> > > A batch though does not have more than the number of keys elements (
> 600 ).
> > >
> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> > > This is a pretty simple pattern, as in I hardly have 1500 elements (
> across 600 keys at the max ) put in
> > > and though I have a pretty wide range , as in I am looking at a
> relaxed pattern ( like 40 true conditions in 6 hours ),
> > > I get this. I have the EventTime turned on.
> > >
> > >
> > > java.lang.OutOfMemoryError: Java heap space
> > >       at java.util.Arrays.copyOf(Arrays
> > > .java:3332)
> > >       at java.lang.AbstractStringBuilde
> > > r.ensureCapacityInternal(Abstr
> > > actStringBuilder.java:124)
> > >       at java.lang.AbstractStringBuilde
> > > r.append(AbstractStringBuilder
> > > .java:448)
> > >       at java.lang.StringBuilder.append
> > > (StringBuilder.java:136)
> > >       at java.lang.StringBuilder.append
> > > (StringBuilder.java:131)
> > >       at org.apache.commons.lang3.Strin
> > > gUtils.join(StringUtils.java:4
> > > 106)
> > >       at org.apache.commons.lang3.Strin
> > > gUtils.join(StringUtils.java:4
> > > 151)
> > >       at org.apache.flink.cep.nfa.Share
> > > dBuffer$SharedBufferEntry.toSt
> > > ring(SharedBuffer.java:624)
> > >       at java.lang.String.valueOf(Strin
> > > g.java:2994)
> > >       at java.lang.StringBuilder.append
> > > (StringBuilder.java:131)
> > >       at org.apache.flink.cep.nfa.Share
> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
> > > 64)
> > >       at org.apache.flink.cep.nfa.Share
> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
> > > 35)
> > >       at org.apache.flink.cep.nfa.NFA$N
> > > FASerializer.serialize(NFA.jav
> > > a:888)
> > >       at org.apache.flink.cep.nfa.NFA$N
> > > FASerializer.serialize(NFA.jav
> > > a:820)
> > >       at org.apache.flink.contrib.strea
> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> > > .
> > > .
> > > .
> > >
> > > Any one has seen this issue ?
> > >
> > >
> > >
> > >
> > >
> >
> >
>
>

Reply via email to