Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108372187 --- Diff: docs/dev/libs/cep.md --- @@ -124,13 +124,70 @@ val start : Pattern[Event, _] = Pattern.begin("start") </div> </div> -Each state must have an unique name to identify the matched events later on. +Each state must have a unique name to identify the matched events later on. Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. + +**Iterative Conditions:** This type of conditions can iterate over the previously accepted elements in the pattern and +decide to accept a new element or not, based on some statistic over those elements. + +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. +`oneToMany` or `zeroToMany`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +start.where(new IterativeCondition<SubEvent>() { + @Override + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } +}); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +start.where( --- End diff -- The other option would be to have just scala wrapper around `Context` and use it in the methods accepting lambdas. Underneath it would be used in IterativeCondition. Not sure if the methods accepting objects e.g. `IterativeCondition` in Scala would be used at all. E.g. ``` where( filterFun: (K, ScalaContext[K]) => Boolean) : Pattern[T, F] = { val filter = new IterativeCondition[F] { val cleanFilter = cep.scala.cleanClosure(filterFun) override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value, ScalaContext(ctx)) } where(filter) } ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---