[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701924#comment-16701924
 ] 

ASF GitHub Bot commented on FLINK-8159:
---------------------------------------

dawidwys closed pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
 
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
index 33d24de7748..91a15032a91 100644
--- 
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
+++ 
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
@@ -113,7 +113,7 @@ class PatternTest {
 
     assertTrue(pattern.getCondition.isDefined)
     assertTrue(previous.getCondition.isDefined)
-    assertFalse(preprevious.getCondition.isDefined)
+    assertTrue(preprevious.getCondition.isDefined)
 
     assertEquals(pattern.getName, "end")
     assertEquals(previous.getName, "next")
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
new file mode 100644
index 00000000000..ff6e610845c
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class for pattern select function and iterative condition 
function's {@link RuntimeContext}.
+ * The runtime context only supports basic operations. Consequently, state 
access, accumulators,
+ * broadcast variables and the distributed cache are disabled.
+ */
+public class CepRuntimeContext implements RuntimeContext {
+
+       private final RuntimeContext runtimeContext;
+
+       public CepRuntimeContext(RuntimeContext runtimeContext) {
+               this.runtimeContext = runtimeContext;
+       }
+
+       @Override
+       public String getTaskName() {
+               return runtimeContext.getTaskName();
+       }
+
+       @Override
+       public MetricGroup getMetricGroup() {
+               return runtimeContext.getMetricGroup();
+       }
+
+       @Override
+       public int getNumberOfParallelSubtasks() {
+               return runtimeContext.getNumberOfParallelSubtasks();
+       }
+
+       @Override
+       public int getMaxNumberOfParallelSubtasks() {
+               return runtimeContext.getMaxNumberOfParallelSubtasks();
+       }
+
+       @Override
+       public int getIndexOfThisSubtask() {
+               return runtimeContext.getIndexOfThisSubtask();
+       }
+
+       @Override
+       public int getAttemptNumber() {
+               return runtimeContext.getAttemptNumber();
+       }
+
+       @Override
+       public String getTaskNameWithSubtasks() {
+               return runtimeContext.getTaskNameWithSubtasks();
+       }
+
+       @Override
+       public ExecutionConfig getExecutionConfig() {
+               return runtimeContext.getExecutionConfig();
+       }
+
+       @Override
+       public ClassLoader getUserCodeClassLoader() {
+               return runtimeContext.getUserCodeClassLoader();
+       }
+
+       // 
-----------------------------------------------------------------------------------
+       // Unsupported operations
+       // 
-----------------------------------------------------------------------------------
+
+       @Override
+       public <V, A extends Serializable> void addAccumulator(
+               String name, Accumulator<V, A> accumulator) {
+               throw new UnsupportedOperationException("Accumulators are not 
supported.");
+       }
+
+       @Override
+       public <V, A extends Serializable> Accumulator<V, A> 
getAccumulator(String name) {
+               throw new UnsupportedOperationException("Accumulators are not 
supported.");
+       }
+
+       @Override
+       public Map<String, Accumulator<?, ?>> getAllAccumulators() {
+               throw new UnsupportedOperationException("Accumulators are not 
supported.");
+       }
+
+       @Override
+       public IntCounter getIntCounter(String name) {
+               throw new UnsupportedOperationException("Int counters are not 
supported.");
+       }
+
+       @Override
+       public LongCounter getLongCounter(String name) {
+               throw new UnsupportedOperationException("Long counters are not 
supported.");
+       }
+
+       @Override
+       public DoubleCounter getDoubleCounter(String name) {
+               throw new UnsupportedOperationException("Double counters are 
not supported.");
+       }
+
+       @Override
+       public Histogram getHistogram(String name) {
+               throw new UnsupportedOperationException("Histograms are not 
supported.");
+       }
+
+       @Override
+       public boolean hasBroadcastVariable(String name) {
+               throw new UnsupportedOperationException("Broadcast variables 
are not supported.");
+       }
+
+       @Override
+       public <RT> List<RT> getBroadcastVariable(String name) {
+               throw new UnsupportedOperationException("Broadcast variables 
are not supported.");
+       }
+
+       @Override
+       public <T, C> C getBroadcastVariableWithInitializer(
+               String name, BroadcastVariableInitializer<T, C> initializer) {
+               throw new UnsupportedOperationException("Broadcast variables 
are not supported.");
+       }
+
+       @Override
+       public DistributedCache getDistributedCache() {
+               throw new UnsupportedOperationException("Distributed cache is 
not supported.");
+       }
+
+       @Override
+       public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
+               throw new UnsupportedOperationException("State is not 
supported.");
+       }
+
+       @Override
+       public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
+               throw new UnsupportedOperationException("State is not 
supported.");
+       }
+
+       @Override
+       public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> 
stateProperties) {
+               throw new UnsupportedOperationException("State is not 
supported.");
+       }
+
+       @Override
+       public <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+               throw new UnsupportedOperationException("State is not 
supported.");
+       }
+
+       @Override
+       public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+               throw new UnsupportedOperationException("State is not 
supported.");
+       }
+
+       @Override
+       public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> 
stateProperties) {
+               throw new UnsupportedOperationException("State is not 
supported.");
+       }
+}
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
new file mode 100644
index 00000000000..a2b89c37e97
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rich variant of the {@link PatternFlatSelectFunction}. As a {@link 
RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> Type of the input elements
+ * @param <OUT> Type of the output element
+ */
+public abstract class RichPatternFlatSelectFunction<IN, OUT>
+               extends AbstractRichFunction
+               implements PatternFlatSelectFunction<IN, OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void setRuntimeContext(RuntimeContext runtimeContext) {
+               Preconditions.checkNotNull(runtimeContext);
+
+               if (runtimeContext instanceof CepRuntimeContext) {
+                       super.setRuntimeContext(runtimeContext);
+               } else {
+                       super.setRuntimeContext(new 
CepRuntimeContext(runtimeContext));
+               }
+       }
+
+       public abstract void flatSelect(Map<String, List<IN>> pattern, 
Collector<OUT> out) throws Exception;
+}
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java
new file mode 100644
index 00000000000..ce694a38e05
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rich variant of the {@link PatternSelectFunction}. As a {@link 
RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> Type of the input elements
+ * @param <OUT> Type of the output element
+ */
+public abstract class RichPatternSelectFunction<IN, OUT>
+               extends AbstractRichFunction
+               implements PatternSelectFunction<IN, OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void setRuntimeContext(RuntimeContext runtimeContext) {
+               Preconditions.checkNotNull(runtimeContext);
+
+               if (runtimeContext instanceof CepRuntimeContext) {
+                       super.setRuntimeContext(runtimeContext);
+               } else {
+                       super.setRuntimeContext(new 
CepRuntimeContext(runtimeContext));
+               }
+       }
+
+       public abstract OUT select(Map<String, List<IN>> pattern) throws 
Exception;
+}
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index ed2ff2e7e59..bc692f2bc0c 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -29,10 +29,10 @@
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.Quantifier.Times;
-import org.apache.flink.cep.pattern.conditions.AndCondition;
 import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
-import org.apache.flink.cep.pattern.conditions.NotCondition;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichNotCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 import java.io.Serializable;
@@ -288,9 +288,9 @@ private void checkPatternNameUniqueness(final Pattern 
pattern) {
 
                                        if (lastSink.isFinal()) {
                                                //so that the proceed to final 
is not fired
-                                               notNext.addIgnore(lastSink, new 
NotCondition<>(notCondition));
+                                               notNext.addIgnore(lastSink, new 
RichNotCondition<>(notCondition));
                                        } else {
-                                               notNext.addProceed(lastSink, 
new NotCondition<>(notCondition));
+                                               notNext.addProceed(lastSink, 
new RichNotCondition<>(notCondition));
                                        }
                                        notNext.addProceed(stopState, 
notCondition);
                                        lastSink = notNext;
@@ -612,11 +612,11 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
                                        if (untilCondition != null) {
                                                singletonState.addProceed(
                                                        
originalStateMap.get(proceedState.getName()),
-                                                       new 
AndCondition<>(proceedCondition, untilCondition));
+                                                       new 
RichAndCondition<>(proceedCondition, untilCondition));
                                        }
                                        singletonState.addProceed(proceedState,
                                                untilCondition != null
-                                                       ? new 
AndCondition<>(proceedCondition, new NotCondition<>(untilCondition))
+                                                       ? new 
RichAndCondition<>(proceedCondition, new RichNotCondition<>(untilCondition))
                                                        : proceedCondition);
                                } else {
                                        singletonState.addProceed(proceedState, 
proceedCondition);
@@ -734,12 +734,12 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
                        if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
                                if (untilCondition != null) {
                                        State<T> sinkStateCopy = 
copy(sinkState);
-                                       loopingState.addProceed(sinkStateCopy, 
new AndCondition<>(proceedCondition, untilCondition));
+                                       loopingState.addProceed(sinkStateCopy, 
new RichAndCondition<>(proceedCondition, untilCondition));
                                        
originalStateMap.put(sinkState.getName(), sinkStateCopy);
                                }
                                loopingState.addProceed(sinkState,
                                        untilCondition != null
-                                               ? new 
AndCondition<>(proceedCondition, new NotCondition<>(untilCondition))
+                                               ? new 
RichAndCondition<>(proceedCondition, new RichNotCondition<>(untilCondition))
                                                : proceedCondition);
                                updateWithGreedyCondition(sinkState, 
getTakeCondition(currentPattern));
                        } else {
@@ -774,9 +774,9 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
                                IterativeCondition<T> untilCondition,
                                boolean isTakeCondition) {
                        if (untilCondition != null && condition != null) {
-                               return new AndCondition<>(new 
NotCondition<>(untilCondition), condition);
+                               return new RichAndCondition<>(new 
RichNotCondition<>(untilCondition), condition);
                        } else if (untilCondition != null && isTakeCondition) {
-                               return new NotCondition<>(untilCondition);
+                               return new RichNotCondition<>(untilCondition);
                        }
 
                        return condition;
@@ -802,7 +802,7 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
                                        innerIgnoreCondition = null;
                                        break;
                                case SKIP_TILL_NEXT:
-                                       innerIgnoreCondition = new 
NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+                                       innerIgnoreCondition = new 
RichNotCondition<>((IterativeCondition<T>) pattern.getCondition());
                                        break;
                                case SKIP_TILL_ANY:
                                        innerIgnoreCondition = 
BooleanConditions.trueFunction();
@@ -843,7 +843,7 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
                                        ignoreCondition = null;
                                        break;
                                case SKIP_TILL_NEXT:
-                                       ignoreCondition = new 
NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+                                       ignoreCondition = new 
RichNotCondition<>((IterativeCondition<T>) pattern.getCondition());
                                        break;
                                case SKIP_TILL_ANY:
                                        ignoreCondition = 
BooleanConditions.trueFunction();
@@ -896,7 +896,7 @@ private void updateWithGreedyCondition(
                        IterativeCondition<T> takeCondition) {
                        for (StateTransition<T> stateTransition : 
state.getStateTransitions()) {
                                stateTransition.setCondition(
-                                       new 
AndCondition<>(stateTransition.getCondition(), new 
NotCondition<>(takeCondition)));
+                                       new 
RichAndCondition<>(stateTransition.getCondition(), new 
RichNotCondition<>(takeCondition)));
                        }
                }
        }
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index b57c3fe0b2f..c603741cea5 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -33,10 +34,14 @@
 import org.apache.flink.cep.nfa.NFA.MigratedNFA;
 import org.apache.flink.cep.nfa.NFAState;
 import org.apache.flink.cep.nfa.NFAStateSerializer;
+import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.KeyedStateFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.VoidNamespace;
@@ -185,6 +190,15 @@ public void open() throws Exception {
                                this);
 
                this.nfa = nfaFactory.createNFA();
+
+               openNFA(nfa);
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+
+               closeNFA(nfa);
        }
 
        @Override
@@ -395,6 +409,26 @@ private void advanceTime(NFAState nfaState, long 
timestamp) throws Exception {
                }
        }
 
+       private void openNFA(NFA<IN> nfa) throws Exception {
+               Configuration conf = new Configuration();
+               for (State<IN> state : nfa.getStates()) {
+                       for (StateTransition<IN> transition : 
state.getStateTransitions()) {
+                               IterativeCondition condition = 
transition.getCondition();
+                               
FunctionUtils.setFunctionRuntimeContext(condition, getRuntimeContext());
+                               FunctionUtils.openFunction(condition, conf);
+                       }
+               }
+       }
+
+       private void closeNFA(NFA<IN> nfa) throws Exception {
+               for (State<IN> state : nfa.getStates()) {
+                       for (StateTransition<IN> transition : 
state.getStateTransitions()) {
+                               IterativeCondition condition = 
transition.getCondition();
+                               FunctionUtils.closeFunction(condition);
+                       }
+               }
+       }
+
        protected abstract void processMatchedSequences(Iterable<Map<String, 
List<IN>>> matchingSequences, long timestamp) throws Exception;
 
        protected void processTimedOutSequences(
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 6ad9d9a1669..64236765caf 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -23,9 +23,10 @@
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
 import org.apache.flink.cep.pattern.Quantifier.Times;
-import org.apache.flink.cep.pattern.conditions.AndCondition;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
-import org.apache.flink.cep.pattern.conditions.OrCondition;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichOrCondition;
 import org.apache.flink.cep.pattern.conditions.SubtypeCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Preconditions;
@@ -105,7 +106,11 @@ public Quantifier getQuantifier() {
        }
 
        public IterativeCondition<F> getCondition() {
-               return condition;
+               if (condition != null) {
+                       return condition;
+               } else {
+                       return BooleanConditions.trueFunction();
+               }
        }
 
        public IterativeCondition<F> getUntilCondition() {
@@ -154,7 +159,7 @@ public Quantifier getQuantifier() {
                if (this.condition == null) {
                        this.condition = condition;
                } else {
-                       this.condition = new AndCondition<>(this.condition, 
condition);
+                       this.condition = new RichAndCondition<>(this.condition, 
condition);
                }
                return this;
        }
@@ -177,7 +182,7 @@ public Quantifier getQuantifier() {
                if (this.condition == null) {
                        this.condition = condition;
                } else {
-                       this.condition = new OrCondition<>(this.condition, 
condition);
+                       this.condition = new RichOrCondition<>(this.condition, 
condition);
                }
                return this;
        }
@@ -196,7 +201,7 @@ public Quantifier getQuantifier() {
                if (condition == null) {
                        this.condition = new SubtypeCondition<F>(subtypeClass);
                } else {
-                       this.condition = new AndCondition<>(condition, new 
SubtypeCondition<F>(subtypeClass));
+                       this.condition = new RichAndCondition<>(condition, new 
SubtypeCondition<F>(subtypeClass));
                }
 
                @SuppressWarnings("unchecked")
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
index ac34c41301b..4622417bcf1 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -25,7 +26,11 @@
  * {@code AND} and returns {@code true} if both are {@code true}.
  *
  * @param <T> Type of the element to filter
+ * @deprecated Please use {@link RichAndCondition} instead. This class exists 
just for
+ * backwards compatibility and will be removed in FLINK-10113.
  */
+@Internal
+@Deprecated
 public class AndCondition<T> extends IterativeCondition<T> {
 
        private static final long serialVersionUID = -2471892317390197319L;
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
index aea5a3bdd43..17c443fa0d9 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.annotation.Internal;
+
 /**
  * Utility class containing an {@link IterativeCondition} that always returns
  * {@code true} and one that always returns {@code false}.
  */
+@Internal
 public class BooleanConditions {
 
        /**
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
index 9318c2f6772..72dc4bb8697 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
@@ -18,12 +18,18 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.annotation.Internal;
+
 /**
  * A {@link IterativeCondition condition} which negates the condition it wraps
  * and returns {@code true} if the original condition returns {@code false}.
  *
  * @param <T> Type of the element to filter
+ * @deprecated Please use {@link RichNotCondition} instead. This class exists 
just for
+ * backwards compatibility and will be removed in FLINK-10113.
  */
+@Internal
+@Deprecated
 public class NotCondition<T> extends IterativeCondition<T> {
        private static final long serialVersionUID = -2109562093871155005L;
 
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
index d3690ab4da0..ac8c465c59f 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -25,7 +26,11 @@
  * {@code OR} and returns {@code true} if at least one is {@code true}.
  *
  * @param <T> Type of the element to filter
+ * @deprecated Please use {@link RichOrCondition} instead. This class exists 
just for
+ * backwards compatibility and will be removed in FLINK-10113.
  */
+@Internal
+@Deprecated
 public class OrCondition<T> extends IterativeCondition<T> {
 
        private static final long serialVersionUID = 2554610954278485106L;
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
new file mode 100644
index 00000000000..1d5d1e7367d
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param <T> Type of the element to filter
+ */
+@Internal
+public class RichAndCondition<T> extends RichCompositeIterativeCondition<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       public RichAndCondition(final IterativeCondition<T> left, final 
IterativeCondition<T> right) {
+               super(left, right);
+       }
+
+       @Override
+       public boolean filter(T value, Context<T> ctx) throws Exception {
+               return getLeft().filter(value, ctx) && getRight().filter(value, 
ctx);
+       }
+
+       /**
+        * @return One of the {@link IterativeCondition conditions} combined in 
this condition.
+        */
+       public IterativeCondition<T> getLeft() {
+               return getNestedConditions()[0];
+       }
+
+       /**
+        * @return One of the {@link IterativeCondition conditions} combined in 
this condition.
+        */
+       public IterativeCondition<T> getRight() {
+               return getNestedConditions()[1];
+       }
+}
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichCompositeIterativeCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichCompositeIterativeCondition.java
new file mode 100644
index 00000000000..5f7789adba6
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichCompositeIterativeCondition.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A base class of composite {@link IterativeCondition} conditions such as 
{@link RichAndCondition},
+ * {@link RichOrCondition} and {@link RichNotCondition}, etc. It handles the 
open, close and
+ * setRuntimeContext for the nested {@link IterativeCondition} conditions.
+ *
+ * @param <T> Type of the element to filter
+ */
+public abstract class RichCompositeIterativeCondition<T> extends 
RichIterativeCondition<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final IterativeCondition<T>[] nestedConditions;
+
+       @SafeVarargs
+       public RichCompositeIterativeCondition(final IterativeCondition<T>... 
nestedConditions) {
+               for (IterativeCondition<T> condition : nestedConditions) {
+                       Preconditions.checkNotNull(condition, "The condition 
cannot be null.");
+               }
+               this.nestedConditions = nestedConditions;
+       }
+
+       public IterativeCondition<T>[] getNestedConditions() {
+               return nestedConditions;
+       }
+
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               super.setRuntimeContext(t);
+
+               for (IterativeCondition<T> nestedCondition : nestedConditions) {
+                       
FunctionUtils.setFunctionRuntimeContext(nestedCondition, t);
+               }
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               for (IterativeCondition<T> nestedCondition : nestedConditions) {
+                       FunctionUtils.openFunction(nestedCondition, parameters);
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               for (IterativeCondition<T> nestedCondition : nestedConditions) {
+                       FunctionUtils.closeFunction(nestedCondition);
+               }
+       }
+}
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
new file mode 100644
index 00000000000..12f017e9946
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.cep.CepRuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Rich variant of the {@link IterativeCondition}. As a {@link RichFunction}, 
it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ */
+public abstract class RichIterativeCondition<T>
+               extends IterativeCondition<T>
+               implements RichFunction {
+
+       private static final long serialVersionUID = 1L;
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Runtime context access
+       // 
--------------------------------------------------------------------------------------------
+
+       private transient RuntimeContext runtimeContext;
+
+       @Override
+       public void setRuntimeContext(RuntimeContext runtimeContext) {
+               Preconditions.checkNotNull(runtimeContext);
+
+               if (runtimeContext instanceof CepRuntimeContext) {
+                       this.runtimeContext = runtimeContext;
+               } else {
+                       this.runtimeContext = new 
CepRuntimeContext(runtimeContext);
+               }
+       }
+
+       @Override
+       public RuntimeContext getRuntimeContext() {
+               if (this.runtimeContext != null) {
+                       return this.runtimeContext;
+               } else {
+                       throw new IllegalStateException("The runtime context 
has not been initialized.");
+               }
+       }
+
+       @Override
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               throw new UnsupportedOperationException("Not support to get the 
IterationRuntimeContext in IterativeCondition.");
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Default life cycle methods
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void open(Configuration parameters) throws Exception {}
+
+       @Override
+       public void close() throws Exception {}
+
+}
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
new file mode 100644
index 00000000000..a4929eb5e15
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A {@link RichIterativeCondition condition} which negates the condition it 
wraps
+ * and returns {@code true} if the original condition returns {@code false}.
+ *
+ * @param <T> Type of the element to filter
+ */
+@Internal
+public class RichNotCondition<T> extends RichCompositeIterativeCondition<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       public RichNotCondition(final IterativeCondition<T> original) {
+               super(original);
+       }
+
+       @Override
+       public boolean filter(T value, Context<T> ctx) throws Exception {
+               return !getNestedConditions()[0].filter(value, ctx);
+       }
+}
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java
new file mode 100644
index 00000000000..03bd1e73092
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code OR} and returns {@code true} if at least one is {@code true}.
+ *
+ * @param <T> Type of the element to filter
+ */
+@Internal
+public class RichOrCondition<T> extends RichCompositeIterativeCondition<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       public RichOrCondition(final IterativeCondition<T> left, final 
IterativeCondition<T> right) {
+               super(left, right);
+       }
+
+       @Override
+       public boolean filter(T value, Context<T> ctx) throws Exception {
+               return getLeft().filter(value, ctx) || getRight().filter(value, 
ctx);
+       }
+
+       /**
+        * @return One of the {@link IterativeCondition conditions} combined in 
this condition.
+        */
+       public IterativeCondition<T> getLeft() {
+               return getNestedConditions()[0];
+       }
+
+       /**
+        * @return One of the {@link IterativeCondition conditions} combined in 
this condition.
+        */
+       public IterativeCondition<T> getRight() {
+               return getNestedConditions()[1];
+       }
+}
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
index 9ca52c5c8d8..de46d1f756d 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.FilterFunction;
 
 /**
@@ -28,6 +29,7 @@
  * previously accepted elements in the pattern. Conditions that extend this 
class are simple {@code filter(...)}
  * functions that decide based on the properties of the element at hand.
  */
+@Internal
 public abstract class SimpleCondition<T> extends IterativeCondition<T> 
implements FilterFunction<T> {
 
        private static final long serialVersionUID = 4942618239408140245L;
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
index cff8693c588..249757d8e59 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -26,6 +27,7 @@
  *
  * @param <T> Type of the elements to be filtered
  */
+@Internal
 public class SubtypeCondition<T> extends SimpleCondition<T> {
        private static final long serialVersionUID = -2990017519957561355L;
 
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index e397d318241..33c1fdd571c 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -19,13 +19,17 @@
 package org.apache.flink.cep;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
@@ -35,6 +39,7 @@
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Either;
+import org.apache.flink.util.Collector;
 
 import org.junit.Test;
 
@@ -716,4 +721,167 @@ public boolean filter(Tuple2<Integer, String> rec) throws 
Exception {
 
                assertEquals(expected, resultList);
        }
+
+       @Test
+       public void testRichPatternFlatSelectFunction() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Event> input = env.fromElements(
+                       new Event(1, "barfoo", 1.0),
+                       new Event(2, "start", 2.0),
+                       new Event(3, "foobar", 3.0),
+                       new SubEvent(4, "foo", 4.0, 1.0),
+                       new Event(5, "middle", 5.0),
+                       new SubEvent(6, "middle", 6.0, 2.0),
+                       new SubEvent(7, "bar", 3.0, 3.0),
+                       new Event(42, "42", 42.0),
+                       new Event(8, "end", 1.0)
+               );
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new RichIterativeCondition<Event>() {
+
+                       @Override
+                       public boolean filter(Event value, Context<Event> ctx) 
throws Exception {
+                               return value.getName().equals("start");
+                       }
+               }).followedByAny("middle").subtype(SubEvent.class).where(
+                       new SimpleCondition<SubEvent>() {
+
+                               @Override
+                               public boolean filter(SubEvent value) throws 
Exception {
+                                       return value.getName().equals("middle");
+                               }
+                       }
+               ).followedByAny("end").where(new SimpleCondition<Event>() {
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               DataStream<String> result =
+                       CEP.pattern(input, pattern).flatSelect(new 
RichPatternFlatSelectFunction<Event, String>() {
+
+                               @Override
+                               public void open(Configuration config) {
+                                       try {
+                                               
getRuntimeContext().getMapState(new MapStateDescriptor<>(
+                                                       "test",
+                                                       LongSerializer.INSTANCE,
+                                                       
LongSerializer.INSTANCE));
+                                               throw new 
RuntimeException("Expected getMapState to fail with unsupported operation 
exception.");
+                                       } catch (UnsupportedOperationException 
e) {
+                                               // ignore, expected
+                                       }
+
+                                       
getRuntimeContext().getUserCodeClassLoader();
+                               }
+
+                               @Override
+                               public void flatSelect(Map<String, List<Event>> 
p, Collector<String> o) throws Exception {
+                                       StringBuilder builder = new 
StringBuilder();
+
+                                       
builder.append(p.get("start").get(0).getId()).append(",")
+                                               
.append(p.get("middle").get(0).getId()).append(",")
+                                               
.append(p.get("end").get(0).getId());
+
+                                       o.collect(builder.toString());
+                               }
+                       }, Types.STRING);
+
+               List<String> resultList = new ArrayList<>();
+
+               
DataStreamUtils.collect(result).forEachRemaining(resultList::add);
+
+               assertEquals(Arrays.asList("2,6,8"), resultList);
+       }
+
+       @Test
+       public void testRichPatternSelectFunction() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(2);
+
+               DataStream<Event> input = env.fromElements(
+                       new Event(1, "barfoo", 1.0),
+                       new Event(2, "start", 2.0),
+                       new Event(3, "start", 2.1),
+                       new Event(3, "foobar", 3.0),
+                       new SubEvent(4, "foo", 4.0, 1.0),
+                       new SubEvent(3, "middle", 3.2, 1.0),
+                       new Event(42, "start", 3.1),
+                       new SubEvent(42, "middle", 3.3, 1.2),
+                       new Event(5, "middle", 5.0),
+                       new SubEvent(2, "middle", 6.0, 2.0),
+                       new SubEvent(7, "bar", 3.0, 3.0),
+                       new Event(42, "42", 42.0),
+                       new Event(3, "end", 2.0),
+                       new Event(2, "end", 1.0),
+                       new Event(42, "end", 42.0)
+               ).keyBy(new KeySelector<Event, Integer>() {
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               });
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new RichIterativeCondition<Event>() {
+
+                       @Override
+                       public boolean filter(Event value, Context<Event> ctx) 
throws Exception {
+                               return value.getName().equals("start");
+                       }
+               }).followedByAny("middle").subtype(SubEvent.class).where(
+                       new SimpleCondition<SubEvent>() {
+
+                               @Override
+                               public boolean filter(SubEvent value) throws 
Exception {
+                                       return value.getName().equals("middle");
+                               }
+                       }
+               ).followedByAny("end").where(new SimpleCondition<Event>() {
+
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("end");
+                               }
+                       });
+
+               DataStream<String> result = CEP.pattern(input, 
pattern).select(new RichPatternSelectFunction<Event, String>() {
+                       @Override
+                       public void open(Configuration config) {
+                               try {
+                                       getRuntimeContext().getMapState(new 
MapStateDescriptor<>(
+                                               "test",
+                                               LongSerializer.INSTANCE,
+                                               LongSerializer.INSTANCE));
+                                       throw new RuntimeException("Expected 
getMapState to fail with unsupported operation exception.");
+                               } catch (UnsupportedOperationException e) {
+                                       // ignore, expected
+                               }
+
+                               getRuntimeContext().getUserCodeClassLoader();
+                       }
+
+                       @Override
+                       public String select(Map<String, List<Event>> p) throws 
Exception {
+                               StringBuilder builder = new StringBuilder();
+
+                               
builder.append(p.get("start").get(0).getId()).append(",")
+                                       
.append(p.get("middle").get(0).getId()).append(",")
+                                       .append(p.get("end").get(0).getId());
+
+                               return builder.toString();
+                       }
+               });
+
+               List<String> resultList = new ArrayList<>();
+
+               
DataStreamUtils.collect(result).forEachRemaining(resultList::add);
+
+               resultList.sort(String::compareTo);
+
+               assertEquals(Arrays.asList("2,2,2", "3,3,3", "42,42,42"), 
resultList);
+       }
 }
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
new file mode 100644
index 00000000000..6bc4081da54
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichNotCondition;
+import org.apache.flink.cep.pattern.conditions.RichOrCondition;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link CepRuntimeContext}.
+ */
+public class CepRuntimeContextTest {
+
+       @Test
+       public void testRichCompositeIterativeCondition() throws Exception {
+               RichIterativeCondition<Integer> first = new 
TestRichIterativeCondition();
+               RichIterativeCondition<Integer> second = new 
TestRichIterativeCondition();
+               RichIterativeCondition<Integer> third = new 
TestRichIterativeCondition();
+
+               RichCompositeIterativeCondition function = new 
RichCompositeIterativeCondition(first, second, third) {
+                       @Override
+                       public boolean filter(Object value, Context ctx) throws 
Exception {
+                               return false;
+                       }
+               };
+               function.setRuntimeContext(mock(RuntimeContext.class));
+
+               assertTrue(first.getRuntimeContext() instanceof 
CepRuntimeContext);
+               assertTrue(second.getRuntimeContext() instanceof 
CepRuntimeContext);
+               assertTrue(third.getRuntimeContext() instanceof 
CepRuntimeContext);
+       }
+
+       @Test
+       public void testRichAndCondition() throws Exception {
+               RichIterativeCondition<Integer> left = new 
TestRichIterativeCondition();
+               RichIterativeCondition<Integer> right = new 
TestRichIterativeCondition();
+
+               RichAndCondition function = new RichAndCondition<>(left, right);
+               function.setRuntimeContext(mock(RuntimeContext.class));
+
+               assertTrue(left.getRuntimeContext() instanceof 
CepRuntimeContext);
+               assertTrue(right.getRuntimeContext() instanceof 
CepRuntimeContext);
+       }
+
+       @Test
+       public void testRichOrCondition() throws Exception {
+               RichIterativeCondition<Integer> left = new 
TestRichIterativeCondition();
+               RichIterativeCondition<Integer> right = new 
TestRichIterativeCondition();
+
+               RichOrCondition function = new RichOrCondition<>(left, right);
+               function.setRuntimeContext(mock(RuntimeContext.class));
+
+               assertTrue(left.getRuntimeContext() instanceof 
CepRuntimeContext);
+               assertTrue(right.getRuntimeContext() instanceof 
CepRuntimeContext);
+       }
+
+       @Test
+       public void testRichNotCondition() {
+               RichIterativeCondition<Integer> original = new 
TestRichIterativeCondition();
+
+               RichNotCondition function = new RichNotCondition<>(original);
+               function.setRuntimeContext(mock(RuntimeContext.class));
+
+               assertTrue(original.getRuntimeContext() instanceof 
CepRuntimeContext);
+       }
+
+       @Test
+       public void testRichPatternSelectFunction() {
+               verifyRuntimeContext(new TestRichPatternSelectFunction());
+       }
+
+       @Test
+       public void testRichPatternFlatSelectFunction() {
+               verifyRuntimeContext(new TestRichPatternFlatSelectFunction());
+       }
+
+       @Test
+       public void testRichIterativeCondition() {
+               verifyRuntimeContext(new TestRichIterativeCondition());
+       }
+
+       private void verifyRuntimeContext(final RichFunction function) {
+               final String taskName = "foobarTask";
+               final MetricGroup metricGroup = new UnregisteredMetricsGroup();
+               final int numberOfParallelSubtasks = 42;
+               final int indexOfSubtask = 43;
+               final int attemptNumber = 1337;
+               final String taskNameWithSubtask = "barfoo";
+               final ExecutionConfig executionConfig = 
mock(ExecutionConfig.class);
+               final ClassLoader userCodeClassLoader = mock(ClassLoader.class);
+
+               RuntimeContext mockedRuntimeContext = 
mock(RuntimeContext.class);
+
+               when(mockedRuntimeContext.getTaskName()).thenReturn(taskName);
+               
when(mockedRuntimeContext.getMetricGroup()).thenReturn(metricGroup);
+               
when(mockedRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(numberOfParallelSubtasks);
+               
when(mockedRuntimeContext.getIndexOfThisSubtask()).thenReturn(indexOfSubtask);
+               
when(mockedRuntimeContext.getAttemptNumber()).thenReturn(attemptNumber);
+               
when(mockedRuntimeContext.getTaskNameWithSubtasks()).thenReturn(taskNameWithSubtask);
+               
when(mockedRuntimeContext.getExecutionConfig()).thenReturn(executionConfig);
+               
when(mockedRuntimeContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
+
+               function.setRuntimeContext(mockedRuntimeContext);
+
+               RuntimeContext runtimeContext = function.getRuntimeContext();
+
+               assertTrue(runtimeContext instanceof CepRuntimeContext);
+               assertEquals(taskName, runtimeContext.getTaskName());
+               assertEquals(metricGroup, runtimeContext.getMetricGroup());
+               assertEquals(numberOfParallelSubtasks, 
runtimeContext.getNumberOfParallelSubtasks());
+               assertEquals(indexOfSubtask, 
runtimeContext.getIndexOfThisSubtask());
+               assertEquals(attemptNumber, runtimeContext.getAttemptNumber());
+               assertEquals(taskNameWithSubtask, 
runtimeContext.getTaskNameWithSubtasks());
+               assertEquals(executionConfig, 
runtimeContext.getExecutionConfig());
+               assertEquals(userCodeClassLoader, 
runtimeContext.getUserCodeClassLoader());
+
+               try {
+                       runtimeContext.getDistributedCache();
+                       fail("Expected getDistributedCached to fail with 
unsupported operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getState(new 
ValueStateDescriptor<>("foobar", Integer.class, 42));
+                       fail("Expected getState to fail with unsupported 
operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getListState(new 
ListStateDescriptor<>("foobar", Integer.class));
+                       fail("Expected getListState to fail with unsupported 
operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getReducingState(new 
ReducingStateDescriptor<>(
+                               "foobar",
+                               mock(ReduceFunction.class),
+                               Integer.class));
+                       fail("Expected getReducingState to fail with 
unsupported operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getAggregatingState(new 
AggregatingStateDescriptor<>(
+                               "foobar",
+                               mock(AggregateFunction.class),
+                               Integer.class));
+                       fail("Expected getAggregatingState to fail with 
unsupported operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getFoldingState(new 
FoldingStateDescriptor<>(
+                               "foobar",
+                               0,
+                               mock(FoldFunction.class),
+                               Integer.class));
+                       fail("Expected getFoldingState to fail with unsupported 
operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getMapState(new 
MapStateDescriptor<>("foobar", Integer.class, String.class));
+                       fail("Expected getMapState to fail with unsupported 
operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.addAccumulator("foobar", 
mock(Accumulator.class));
+                       fail("Expected addAccumulator to fail with unsupported 
operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getAccumulator("foobar");
+                       fail("Expected getAccumulator to fail with unsupported 
operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getAllAccumulators();
+                       fail("Expected getAllAccumulators to fail with 
unsupported operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getIntCounter("foobar");
+                       fail("Expected getIntCounter to fail with unsupported 
operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getLongCounter("foobar");
+                       fail("Expected getLongCounter to fail with unsupported 
operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getDoubleCounter("foobar");
+                       fail("Expected getDoubleCounter to fail with 
unsupported operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getHistogram("foobar");
+                       fail("Expected getHistogram to fail with unsupported 
operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.hasBroadcastVariable("foobar");
+                       fail("Expected hasBroadcastVariable to fail with 
unsupported operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getBroadcastVariable("foobar");
+                       fail("Expected getBroadcastVariable to fail with 
unsupported operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       runtimeContext.getBroadcastVariableWithInitializer(
+                               "foobar",
+                               mock(BroadcastVariableInitializer.class));
+                       fail("Expected getBroadcastVariableWithInitializer to 
fail with unsupported operation exception.");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+       }
+
+       private static class TestRichIterativeCondition extends 
RichIterativeCondition<Integer> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public boolean filter(Integer value, Context<Integer> ctx) 
throws Exception {
+                       return false;
+               }
+       }
+
+       private static class TestRichPatternSelectFunction extends 
RichPatternSelectFunction<Integer, Integer> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Integer select(Map<String, List<Integer>> pattern) 
throws Exception {
+                       return null;
+               }
+       }
+
+       private static class TestRichPatternFlatSelectFunction extends 
RichPatternFlatSelectFunction<Integer, Integer> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatSelect(Map<String, List<Integer>> pattern, 
Collector<Integer> out) throws Exception {
+                       // no op
+               }
+       }
+}
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index 6d93ff3a3b4..743425226f2 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -21,7 +21,9 @@
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
-import org.apache.flink.cep.pattern.conditions.OrCondition;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichOrCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.cep.pattern.conditions.SubtypeCondition;
 import org.apache.flink.util.TestLogger;
@@ -33,6 +35,7 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests for constructing {@link Pattern}.
@@ -102,7 +105,7 @@ public boolean filter(Event value) throws Exception {
 
                assertNotNull(pattern.getCondition());
                assertNotNull(previous.getCondition());
-               assertNull(previous2.getCondition());
+               assertNotNull(previous2.getCondition());
 
                assertEquals(pattern.getName(), "end");
                assertEquals(previous.getName(), "next");
@@ -187,14 +190,27 @@ public boolean filter(Event value) throws Exception {
                assertNull(previous2.getPrevious());
 
                assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, 
pattern.getQuantifier().getConsumingStrategy());
-               assertFalse(previous.getCondition() instanceof OrCondition);
-               assertTrue(previous2.getCondition() instanceof OrCondition);
+               assertFalse(previous.getCondition() instanceof RichOrCondition);
+               assertTrue(previous2.getCondition() instanceof RichOrCondition);
 
                assertEquals(pattern.getName(), "end");
                assertEquals(previous.getName(), "or");
                assertEquals(previous2.getName(), "start");
        }
 
+       @Test
+       public void testRichCondition() {
+               Pattern<Event, Event> pattern =
+                       Pattern.<Event>begin("start")
+                               .where(mock(IterativeCondition.class))
+                               .where(mock(IterativeCondition.class))
+                       .followedBy("end")
+                               .where(mock(IterativeCondition.class))
+                               .or(mock(IterativeCondition.class));
+               assertTrue(pattern.getCondition() instanceof RichOrCondition);
+               assertTrue(pattern.getPrevious().getCondition() instanceof 
RichAndCondition);
+       }
+
        @Test(expected = IllegalArgumentException.class)
        public void testPatternTimesNegativeTimes() throws Exception {
                Pattern.begin("start").where(dummyCondition()).times(-1);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> ------------------------------------------------------------------
>
>                 Key: FLINK-8159
>                 URL: https://issues.apache.org/jira/browse/FLINK-8159
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to