[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701924#comment-16701924 ]
ASF GitHub Bot commented on FLINK-8159: --------------------------------------- dawidwys closed pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala index 33d24de7748..91a15032a91 100644 --- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala @@ -113,7 +113,7 @@ class PatternTest { assertTrue(pattern.getCondition.isDefined) assertTrue(previous.getCondition.isDefined) - assertFalse(preprevious.getCondition.isDefined) + assertTrue(preprevious.getCondition.isDefined) assertEquals(pattern.getName, "end") assertEquals(previous.getName, "next") diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java new file mode 100644 index 00000000000..ff6e610845c --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.DoubleCounter; +import org.apache.flink.api.common.accumulators.Histogram; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * A wrapper class for pattern select function and iterative condition function's {@link RuntimeContext}. + * The runtime context only supports basic operations. Consequently, state access, accumulators, + * broadcast variables and the distributed cache are disabled. + */ +public class CepRuntimeContext implements RuntimeContext { + + private final RuntimeContext runtimeContext; + + public CepRuntimeContext(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + } + + @Override + public String getTaskName() { + return runtimeContext.getTaskName(); + } + + @Override + public MetricGroup getMetricGroup() { + return runtimeContext.getMetricGroup(); + } + + @Override + public int getNumberOfParallelSubtasks() { + return runtimeContext.getNumberOfParallelSubtasks(); + } + + @Override + public int getMaxNumberOfParallelSubtasks() { + return runtimeContext.getMaxNumberOfParallelSubtasks(); + } + + @Override + public int getIndexOfThisSubtask() { + return runtimeContext.getIndexOfThisSubtask(); + } + + @Override + public int getAttemptNumber() { + return runtimeContext.getAttemptNumber(); + } + + @Override + public String getTaskNameWithSubtasks() { + return runtimeContext.getTaskNameWithSubtasks(); + } + + @Override + public ExecutionConfig getExecutionConfig() { + return runtimeContext.getExecutionConfig(); + } + + @Override + public ClassLoader getUserCodeClassLoader() { + return runtimeContext.getUserCodeClassLoader(); + } + + // ----------------------------------------------------------------------------------- + // Unsupported operations + // ----------------------------------------------------------------------------------- + + @Override + public <V, A extends Serializable> void addAccumulator( + String name, Accumulator<V, A> accumulator) { + throw new UnsupportedOperationException("Accumulators are not supported."); + } + + @Override + public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) { + throw new UnsupportedOperationException("Accumulators are not supported."); + } + + @Override + public Map<String, Accumulator<?, ?>> getAllAccumulators() { + throw new UnsupportedOperationException("Accumulators are not supported."); + } + + @Override + public IntCounter getIntCounter(String name) { + throw new UnsupportedOperationException("Int counters are not supported."); + } + + @Override + public LongCounter getLongCounter(String name) { + throw new UnsupportedOperationException("Long counters are not supported."); + } + + @Override + public DoubleCounter getDoubleCounter(String name) { + throw new UnsupportedOperationException("Double counters are not supported."); + } + + @Override + public Histogram getHistogram(String name) { + throw new UnsupportedOperationException("Histograms are not supported."); + } + + @Override + public boolean hasBroadcastVariable(String name) { + throw new UnsupportedOperationException("Broadcast variables are not supported."); + } + + @Override + public <RT> List<RT> getBroadcastVariable(String name) { + throw new UnsupportedOperationException("Broadcast variables are not supported."); + } + + @Override + public <T, C> C getBroadcastVariableWithInitializer( + String name, BroadcastVariableInitializer<T, C> initializer) { + throw new UnsupportedOperationException("Broadcast variables are not supported."); + } + + @Override + public DistributedCache getDistributedCache() { + throw new UnsupportedOperationException("Distributed cache is not supported."); + } + + @Override + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("State is not supported."); + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("State is not supported."); + } + + @Override + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("State is not supported."); + } + + @Override + public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { + throw new UnsupportedOperationException("State is not supported."); + } + + @Override + public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { + throw new UnsupportedOperationException("State is not supported."); + } + + @Override + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { + throw new UnsupportedOperationException("State is not supported."); + } +} diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java new file mode 100644 index 00000000000..a2b89c37e97 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.List; +import java.util.Map; + +/** + * Rich variant of the {@link PatternFlatSelectFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * @param <IN> Type of the input elements + * @param <OUT> Type of the output element + */ +public abstract class RichPatternFlatSelectFunction<IN, OUT> + extends AbstractRichFunction + implements PatternFlatSelectFunction<IN, OUT> { + + private static final long serialVersionUID = 1L; + + @Override + public void setRuntimeContext(RuntimeContext runtimeContext) { + Preconditions.checkNotNull(runtimeContext); + + if (runtimeContext instanceof CepRuntimeContext) { + super.setRuntimeContext(runtimeContext); + } else { + super.setRuntimeContext(new CepRuntimeContext(runtimeContext)); + } + } + + public abstract void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception; +} diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java new file mode 100644 index 00000000000..ce694a38e05 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Preconditions; + +import java.util.List; +import java.util.Map; + +/** + * Rich variant of the {@link PatternSelectFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * @param <IN> Type of the input elements + * @param <OUT> Type of the output element + */ +public abstract class RichPatternSelectFunction<IN, OUT> + extends AbstractRichFunction + implements PatternSelectFunction<IN, OUT> { + + private static final long serialVersionUID = 1L; + + @Override + public void setRuntimeContext(RuntimeContext runtimeContext) { + Preconditions.checkNotNull(runtimeContext); + + if (runtimeContext instanceof CepRuntimeContext) { + super.setRuntimeContext(runtimeContext); + } else { + super.setRuntimeContext(new CepRuntimeContext(runtimeContext)); + } + } + + public abstract OUT select(Map<String, List<IN>> pattern) throws Exception; +} diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index ed2ff2e7e59..bc692f2bc0c 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -29,10 +29,10 @@ import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.Quantifier; import org.apache.flink.cep.pattern.Quantifier.Times; -import org.apache.flink.cep.pattern.conditions.AndCondition; import org.apache.flink.cep.pattern.conditions.BooleanConditions; import org.apache.flink.cep.pattern.conditions.IterativeCondition; -import org.apache.flink.cep.pattern.conditions.NotCondition; +import org.apache.flink.cep.pattern.conditions.RichAndCondition; +import org.apache.flink.cep.pattern.conditions.RichNotCondition; import org.apache.flink.streaming.api.windowing.time.Time; import java.io.Serializable; @@ -288,9 +288,9 @@ private void checkPatternNameUniqueness(final Pattern pattern) { if (lastSink.isFinal()) { //so that the proceed to final is not fired - notNext.addIgnore(lastSink, new NotCondition<>(notCondition)); + notNext.addIgnore(lastSink, new RichNotCondition<>(notCondition)); } else { - notNext.addProceed(lastSink, new NotCondition<>(notCondition)); + notNext.addProceed(lastSink, new RichNotCondition<>(notCondition)); } notNext.addProceed(stopState, notCondition); lastSink = notNext; @@ -612,11 +612,11 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) { if (untilCondition != null) { singletonState.addProceed( originalStateMap.get(proceedState.getName()), - new AndCondition<>(proceedCondition, untilCondition)); + new RichAndCondition<>(proceedCondition, untilCondition)); } singletonState.addProceed(proceedState, untilCondition != null - ? new AndCondition<>(proceedCondition, new NotCondition<>(untilCondition)) + ? new RichAndCondition<>(proceedCondition, new RichNotCondition<>(untilCondition)) : proceedCondition); } else { singletonState.addProceed(proceedState, proceedCondition); @@ -734,12 +734,12 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) { if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { if (untilCondition != null) { State<T> sinkStateCopy = copy(sinkState); - loopingState.addProceed(sinkStateCopy, new AndCondition<>(proceedCondition, untilCondition)); + loopingState.addProceed(sinkStateCopy, new RichAndCondition<>(proceedCondition, untilCondition)); originalStateMap.put(sinkState.getName(), sinkStateCopy); } loopingState.addProceed(sinkState, untilCondition != null - ? new AndCondition<>(proceedCondition, new NotCondition<>(untilCondition)) + ? new RichAndCondition<>(proceedCondition, new RichNotCondition<>(untilCondition)) : proceedCondition); updateWithGreedyCondition(sinkState, getTakeCondition(currentPattern)); } else { @@ -774,9 +774,9 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) { IterativeCondition<T> untilCondition, boolean isTakeCondition) { if (untilCondition != null && condition != null) { - return new AndCondition<>(new NotCondition<>(untilCondition), condition); + return new RichAndCondition<>(new RichNotCondition<>(untilCondition), condition); } else if (untilCondition != null && isTakeCondition) { - return new NotCondition<>(untilCondition); + return new RichNotCondition<>(untilCondition); } return condition; @@ -802,7 +802,7 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) { innerIgnoreCondition = null; break; case SKIP_TILL_NEXT: - innerIgnoreCondition = new NotCondition<>((IterativeCondition<T>) pattern.getCondition()); + innerIgnoreCondition = new RichNotCondition<>((IterativeCondition<T>) pattern.getCondition()); break; case SKIP_TILL_ANY: innerIgnoreCondition = BooleanConditions.trueFunction(); @@ -843,7 +843,7 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) { ignoreCondition = null; break; case SKIP_TILL_NEXT: - ignoreCondition = new NotCondition<>((IterativeCondition<T>) pattern.getCondition()); + ignoreCondition = new RichNotCondition<>((IterativeCondition<T>) pattern.getCondition()); break; case SKIP_TILL_ANY: ignoreCondition = BooleanConditions.trueFunction(); @@ -896,7 +896,7 @@ private void updateWithGreedyCondition( IterativeCondition<T> takeCondition) { for (StateTransition<T> stateTransition : state.getStateTransitions()) { stateTransition.setCondition( - new AndCondition<>(stateTransition.getCondition(), new NotCondition<>(takeCondition))); + new RichAndCondition<>(stateTransition.getCondition(), new RichNotCondition<>(takeCondition))); } } } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index b57c3fe0b2f..c603741cea5 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; @@ -33,10 +34,14 @@ import org.apache.flink.cep.nfa.NFA.MigratedNFA; import org.apache.flink.cep.nfa.NFAState; import org.apache.flink.cep.nfa.NFAStateSerializer; +import org.apache.flink.cep.nfa.State; +import org.apache.flink.cep.nfa.StateTransition; import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer; import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.KeyedStateFunction; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.VoidNamespace; @@ -185,6 +190,15 @@ public void open() throws Exception { this); this.nfa = nfaFactory.createNFA(); + + openNFA(nfa); + } + + @Override + public void close() throws Exception { + super.close(); + + closeNFA(nfa); } @Override @@ -395,6 +409,26 @@ private void advanceTime(NFAState nfaState, long timestamp) throws Exception { } } + private void openNFA(NFA<IN> nfa) throws Exception { + Configuration conf = new Configuration(); + for (State<IN> state : nfa.getStates()) { + for (StateTransition<IN> transition : state.getStateTransitions()) { + IterativeCondition condition = transition.getCondition(); + FunctionUtils.setFunctionRuntimeContext(condition, getRuntimeContext()); + FunctionUtils.openFunction(condition, conf); + } + } + } + + private void closeNFA(NFA<IN> nfa) throws Exception { + for (State<IN> state : nfa.getStates()) { + for (StateTransition<IN> transition : state.getStateTransitions()) { + IterativeCondition condition = transition.getCondition(); + FunctionUtils.closeFunction(condition); + } + } + } + protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception; protected void processTimedOutSequences( diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 6ad9d9a1669..64236765caf 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -23,9 +23,10 @@ import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy; import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy; import org.apache.flink.cep.pattern.Quantifier.Times; -import org.apache.flink.cep.pattern.conditions.AndCondition; +import org.apache.flink.cep.pattern.conditions.BooleanConditions; import org.apache.flink.cep.pattern.conditions.IterativeCondition; -import org.apache.flink.cep.pattern.conditions.OrCondition; +import org.apache.flink.cep.pattern.conditions.RichAndCondition; +import org.apache.flink.cep.pattern.conditions.RichOrCondition; import org.apache.flink.cep.pattern.conditions.SubtypeCondition; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Preconditions; @@ -105,7 +106,11 @@ public Quantifier getQuantifier() { } public IterativeCondition<F> getCondition() { - return condition; + if (condition != null) { + return condition; + } else { + return BooleanConditions.trueFunction(); + } } public IterativeCondition<F> getUntilCondition() { @@ -154,7 +159,7 @@ public Quantifier getQuantifier() { if (this.condition == null) { this.condition = condition; } else { - this.condition = new AndCondition<>(this.condition, condition); + this.condition = new RichAndCondition<>(this.condition, condition); } return this; } @@ -177,7 +182,7 @@ public Quantifier getQuantifier() { if (this.condition == null) { this.condition = condition; } else { - this.condition = new OrCondition<>(this.condition, condition); + this.condition = new RichOrCondition<>(this.condition, condition); } return this; } @@ -196,7 +201,7 @@ public Quantifier getQuantifier() { if (condition == null) { this.condition = new SubtypeCondition<F>(subtypeClass); } else { - this.condition = new AndCondition<>(condition, new SubtypeCondition<F>(subtypeClass)); + this.condition = new RichAndCondition<>(condition, new SubtypeCondition<F>(subtypeClass)); } @SuppressWarnings("unchecked") diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java index ac34c41301b..4622417bcf1 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java @@ -18,6 +18,7 @@ package org.apache.flink.cep.pattern.conditions; +import org.apache.flink.annotation.Internal; import org.apache.flink.util.Preconditions; /** @@ -25,7 +26,11 @@ * {@code AND} and returns {@code true} if both are {@code true}. * * @param <T> Type of the element to filter + * @deprecated Please use {@link RichAndCondition} instead. This class exists just for + * backwards compatibility and will be removed in FLINK-10113. */ +@Internal +@Deprecated public class AndCondition<T> extends IterativeCondition<T> { private static final long serialVersionUID = -2471892317390197319L; diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java index aea5a3bdd43..17c443fa0d9 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java @@ -18,10 +18,13 @@ package org.apache.flink.cep.pattern.conditions; +import org.apache.flink.annotation.Internal; + /** * Utility class containing an {@link IterativeCondition} that always returns * {@code true} and one that always returns {@code false}. */ +@Internal public class BooleanConditions { /** diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java index 9318c2f6772..72dc4bb8697 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java @@ -18,12 +18,18 @@ package org.apache.flink.cep.pattern.conditions; +import org.apache.flink.annotation.Internal; + /** * A {@link IterativeCondition condition} which negates the condition it wraps * and returns {@code true} if the original condition returns {@code false}. * * @param <T> Type of the element to filter + * @deprecated Please use {@link RichNotCondition} instead. This class exists just for + * backwards compatibility and will be removed in FLINK-10113. */ +@Internal +@Deprecated public class NotCondition<T> extends IterativeCondition<T> { private static final long serialVersionUID = -2109562093871155005L; diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java index d3690ab4da0..ac8c465c59f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java @@ -18,6 +18,7 @@ package org.apache.flink.cep.pattern.conditions; +import org.apache.flink.annotation.Internal; import org.apache.flink.util.Preconditions; /** @@ -25,7 +26,11 @@ * {@code OR} and returns {@code true} if at least one is {@code true}. * * @param <T> Type of the element to filter + * @deprecated Please use {@link RichOrCondition} instead. This class exists just for + * backwards compatibility and will be removed in FLINK-10113. */ +@Internal +@Deprecated public class OrCondition<T> extends IterativeCondition<T> { private static final long serialVersionUID = 2554610954278485106L; diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java new file mode 100644 index 00000000000..1d5d1e7367d --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.annotation.Internal; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param <T> Type of the element to filter + */ +@Internal +public class RichAndCondition<T> extends RichCompositeIterativeCondition<T> { + + private static final long serialVersionUID = 1L; + + public RichAndCondition(final IterativeCondition<T> left, final IterativeCondition<T> right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context<T> ctx) throws Exception { + return getLeft().filter(value, ctx) && getRight().filter(value, ctx); + } + + /** + * @return One of the {@link IterativeCondition conditions} combined in this condition. + */ + public IterativeCondition<T> getLeft() { + return getNestedConditions()[0]; + } + + /** + * @return One of the {@link IterativeCondition conditions} combined in this condition. + */ + public IterativeCondition<T> getRight() { + return getNestedConditions()[1]; + } +} diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichCompositeIterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichCompositeIterativeCondition.java new file mode 100644 index 00000000000..5f7789adba6 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichCompositeIterativeCondition.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; + +/** + * A base class of composite {@link IterativeCondition} conditions such as {@link RichAndCondition}, + * {@link RichOrCondition} and {@link RichNotCondition}, etc. It handles the open, close and + * setRuntimeContext for the nested {@link IterativeCondition} conditions. + * + * @param <T> Type of the element to filter + */ +public abstract class RichCompositeIterativeCondition<T> extends RichIterativeCondition<T> { + + private static final long serialVersionUID = 1L; + + private final IterativeCondition<T>[] nestedConditions; + + @SafeVarargs + public RichCompositeIterativeCondition(final IterativeCondition<T>... nestedConditions) { + for (IterativeCondition<T> condition : nestedConditions) { + Preconditions.checkNotNull(condition, "The condition cannot be null."); + } + this.nestedConditions = nestedConditions; + } + + public IterativeCondition<T>[] getNestedConditions() { + return nestedConditions; + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + super.setRuntimeContext(t); + + for (IterativeCondition<T> nestedCondition : nestedConditions) { + FunctionUtils.setFunctionRuntimeContext(nestedCondition, t); + } + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + for (IterativeCondition<T> nestedCondition : nestedConditions) { + FunctionUtils.openFunction(nestedCondition, parameters); + } + } + + @Override + public void close() throws Exception { + super.close(); + for (IterativeCondition<T> nestedCondition : nestedConditions) { + FunctionUtils.closeFunction(nestedCondition); + } + } +} diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java new file mode 100644 index 00000000000..12f017e9946 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.cep.CepRuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; + +/** + * Rich variant of the {@link IterativeCondition}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + */ +public abstract class RichIterativeCondition<T> + extends IterativeCondition<T> + implements RichFunction { + + private static final long serialVersionUID = 1L; + + // -------------------------------------------------------------------------------------------- + // Runtime context access + // -------------------------------------------------------------------------------------------- + + private transient RuntimeContext runtimeContext; + + @Override + public void setRuntimeContext(RuntimeContext runtimeContext) { + Preconditions.checkNotNull(runtimeContext); + + if (runtimeContext instanceof CepRuntimeContext) { + this.runtimeContext = runtimeContext; + } else { + this.runtimeContext = new CepRuntimeContext(runtimeContext); + } + } + + @Override + public RuntimeContext getRuntimeContext() { + if (this.runtimeContext != null) { + return this.runtimeContext; + } else { + throw new IllegalStateException("The runtime context has not been initialized."); + } + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + throw new UnsupportedOperationException("Not support to get the IterationRuntimeContext in IterativeCondition."); + } + + // -------------------------------------------------------------------------------------------- + // Default life cycle methods + // -------------------------------------------------------------------------------------------- + + @Override + public void open(Configuration parameters) throws Exception {} + + @Override + public void close() throws Exception {} + +} diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java new file mode 100644 index 00000000000..a4929eb5e15 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.annotation.Internal; + +/** + * A {@link RichIterativeCondition condition} which negates the condition it wraps + * and returns {@code true} if the original condition returns {@code false}. + * + * @param <T> Type of the element to filter + */ +@Internal +public class RichNotCondition<T> extends RichCompositeIterativeCondition<T> { + + private static final long serialVersionUID = 1L; + + public RichNotCondition(final IterativeCondition<T> original) { + super(original); + } + + @Override + public boolean filter(T value, Context<T> ctx) throws Exception { + return !getNestedConditions()[0].filter(value, ctx); + } +} diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java new file mode 100644 index 00000000000..03bd1e73092 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.annotation.Internal; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code OR} and returns {@code true} if at least one is {@code true}. + * + * @param <T> Type of the element to filter + */ +@Internal +public class RichOrCondition<T> extends RichCompositeIterativeCondition<T> { + + private static final long serialVersionUID = 1L; + + public RichOrCondition(final IterativeCondition<T> left, final IterativeCondition<T> right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context<T> ctx) throws Exception { + return getLeft().filter(value, ctx) || getRight().filter(value, ctx); + } + + /** + * @return One of the {@link IterativeCondition conditions} combined in this condition. + */ + public IterativeCondition<T> getLeft() { + return getNestedConditions()[0]; + } + + /** + * @return One of the {@link IterativeCondition conditions} combined in this condition. + */ + public IterativeCondition<T> getRight() { + return getNestedConditions()[1]; + } +} diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java index 9ca52c5c8d8..de46d1f756d 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java @@ -18,6 +18,7 @@ package org.apache.flink.cep.pattern.conditions; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.FilterFunction; /** @@ -28,6 +29,7 @@ * previously accepted elements in the pattern. Conditions that extend this class are simple {@code filter(...)} * functions that decide based on the properties of the element at hand. */ +@Internal public abstract class SimpleCondition<T> extends IterativeCondition<T> implements FilterFunction<T> { private static final long serialVersionUID = 4942618239408140245L; diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java index cff8693c588..249757d8e59 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java @@ -18,6 +18,7 @@ package org.apache.flink.cep.pattern.conditions; +import org.apache.flink.annotation.Internal; import org.apache.flink.util.Preconditions; /** @@ -26,6 +27,7 @@ * * @param <T> Type of the elements to be filtered */ +@Internal public class SubtypeCondition<T> extends SimpleCondition<T> { private static final long serialVersionUID = -2990017519957561355L; diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index e397d318241..33c1fdd571c 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -19,13 +19,17 @@ package org.apache.flink.cep; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.RichIterativeCondition; import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamUtils; @@ -35,6 +39,7 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Either; +import org.apache.flink.util.Collector; import org.junit.Test; @@ -716,4 +721,167 @@ public boolean filter(Tuple2<Integer, String> rec) throws Exception { assertEquals(expected, resultList); } + + @Test + public void testRichPatternFlatSelectFunction() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Event> input = env.fromElements( + new Event(1, "barfoo", 1.0), + new Event(2, "start", 2.0), + new Event(3, "foobar", 3.0), + new SubEvent(4, "foo", 4.0, 1.0), + new Event(5, "middle", 5.0), + new SubEvent(6, "middle", 6.0, 2.0), + new SubEvent(7, "bar", 3.0, 3.0), + new Event(42, "42", 42.0), + new Event(8, "end", 1.0) + ); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new RichIterativeCondition<Event>() { + + @Override + public boolean filter(Event value, Context<Event> ctx) throws Exception { + return value.getName().equals("start"); + } + }).followedByAny("middle").subtype(SubEvent.class).where( + new SimpleCondition<SubEvent>() { + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().equals("middle"); + } + } + ).followedByAny("end").where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + DataStream<String> result = + CEP.pattern(input, pattern).flatSelect(new RichPatternFlatSelectFunction<Event, String>() { + + @Override + public void open(Configuration config) { + try { + getRuntimeContext().getMapState(new MapStateDescriptor<>( + "test", + LongSerializer.INSTANCE, + LongSerializer.INSTANCE)); + throw new RuntimeException("Expected getMapState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // ignore, expected + } + + getRuntimeContext().getUserCodeClassLoader(); + } + + @Override + public void flatSelect(Map<String, List<Event>> p, Collector<String> o) throws Exception { + StringBuilder builder = new StringBuilder(); + + builder.append(p.get("start").get(0).getId()).append(",") + .append(p.get("middle").get(0).getId()).append(",") + .append(p.get("end").get(0).getId()); + + o.collect(builder.toString()); + } + }, Types.STRING); + + List<String> resultList = new ArrayList<>(); + + DataStreamUtils.collect(result).forEachRemaining(resultList::add); + + assertEquals(Arrays.asList("2,6,8"), resultList); + } + + @Test + public void testRichPatternSelectFunction() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<Event> input = env.fromElements( + new Event(1, "barfoo", 1.0), + new Event(2, "start", 2.0), + new Event(3, "start", 2.1), + new Event(3, "foobar", 3.0), + new SubEvent(4, "foo", 4.0, 1.0), + new SubEvent(3, "middle", 3.2, 1.0), + new Event(42, "start", 3.1), + new SubEvent(42, "middle", 3.3, 1.2), + new Event(5, "middle", 5.0), + new SubEvent(2, "middle", 6.0, 2.0), + new SubEvent(7, "bar", 3.0, 3.0), + new Event(42, "42", 42.0), + new Event(3, "end", 2.0), + new Event(2, "end", 1.0), + new Event(42, "end", 42.0) + ).keyBy(new KeySelector<Event, Integer>() { + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new RichIterativeCondition<Event>() { + + @Override + public boolean filter(Event value, Context<Event> ctx) throws Exception { + return value.getName().equals("start"); + } + }).followedByAny("middle").subtype(SubEvent.class).where( + new SimpleCondition<SubEvent>() { + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().equals("middle"); + } + } + ).followedByAny("end").where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + DataStream<String> result = CEP.pattern(input, pattern).select(new RichPatternSelectFunction<Event, String>() { + @Override + public void open(Configuration config) { + try { + getRuntimeContext().getMapState(new MapStateDescriptor<>( + "test", + LongSerializer.INSTANCE, + LongSerializer.INSTANCE)); + throw new RuntimeException("Expected getMapState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // ignore, expected + } + + getRuntimeContext().getUserCodeClassLoader(); + } + + @Override + public String select(Map<String, List<Event>> p) throws Exception { + StringBuilder builder = new StringBuilder(); + + builder.append(p.get("start").get(0).getId()).append(",") + .append(p.get("middle").get(0).getId()).append(",") + .append(p.get("end").get(0).getId()); + + return builder.toString(); + } + }); + + List<String> resultList = new ArrayList<>(); + + DataStreamUtils.collect(result).forEachRemaining(resultList::add); + + resultList.sort(String::compareTo); + + assertEquals(Arrays.asList("2,2,2", "3,3,3", "42,42,42"), resultList); + } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java new file mode 100644 index 00000000000..6bc4081da54 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.cep.pattern.conditions.RichAndCondition; +import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichIterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichNotCondition; +import org.apache.flink.cep.pattern.conditions.RichOrCondition; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.Collector; + +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test cases for {@link CepRuntimeContext}. + */ +public class CepRuntimeContextTest { + + @Test + public void testRichCompositeIterativeCondition() throws Exception { + RichIterativeCondition<Integer> first = new TestRichIterativeCondition(); + RichIterativeCondition<Integer> second = new TestRichIterativeCondition(); + RichIterativeCondition<Integer> third = new TestRichIterativeCondition(); + + RichCompositeIterativeCondition function = new RichCompositeIterativeCondition(first, second, third) { + @Override + public boolean filter(Object value, Context ctx) throws Exception { + return false; + } + }; + function.setRuntimeContext(mock(RuntimeContext.class)); + + assertTrue(first.getRuntimeContext() instanceof CepRuntimeContext); + assertTrue(second.getRuntimeContext() instanceof CepRuntimeContext); + assertTrue(third.getRuntimeContext() instanceof CepRuntimeContext); + } + + @Test + public void testRichAndCondition() throws Exception { + RichIterativeCondition<Integer> left = new TestRichIterativeCondition(); + RichIterativeCondition<Integer> right = new TestRichIterativeCondition(); + + RichAndCondition function = new RichAndCondition<>(left, right); + function.setRuntimeContext(mock(RuntimeContext.class)); + + assertTrue(left.getRuntimeContext() instanceof CepRuntimeContext); + assertTrue(right.getRuntimeContext() instanceof CepRuntimeContext); + } + + @Test + public void testRichOrCondition() throws Exception { + RichIterativeCondition<Integer> left = new TestRichIterativeCondition(); + RichIterativeCondition<Integer> right = new TestRichIterativeCondition(); + + RichOrCondition function = new RichOrCondition<>(left, right); + function.setRuntimeContext(mock(RuntimeContext.class)); + + assertTrue(left.getRuntimeContext() instanceof CepRuntimeContext); + assertTrue(right.getRuntimeContext() instanceof CepRuntimeContext); + } + + @Test + public void testRichNotCondition() { + RichIterativeCondition<Integer> original = new TestRichIterativeCondition(); + + RichNotCondition function = new RichNotCondition<>(original); + function.setRuntimeContext(mock(RuntimeContext.class)); + + assertTrue(original.getRuntimeContext() instanceof CepRuntimeContext); + } + + @Test + public void testRichPatternSelectFunction() { + verifyRuntimeContext(new TestRichPatternSelectFunction()); + } + + @Test + public void testRichPatternFlatSelectFunction() { + verifyRuntimeContext(new TestRichPatternFlatSelectFunction()); + } + + @Test + public void testRichIterativeCondition() { + verifyRuntimeContext(new TestRichIterativeCondition()); + } + + private void verifyRuntimeContext(final RichFunction function) { + final String taskName = "foobarTask"; + final MetricGroup metricGroup = new UnregisteredMetricsGroup(); + final int numberOfParallelSubtasks = 42; + final int indexOfSubtask = 43; + final int attemptNumber = 1337; + final String taskNameWithSubtask = "barfoo"; + final ExecutionConfig executionConfig = mock(ExecutionConfig.class); + final ClassLoader userCodeClassLoader = mock(ClassLoader.class); + + RuntimeContext mockedRuntimeContext = mock(RuntimeContext.class); + + when(mockedRuntimeContext.getTaskName()).thenReturn(taskName); + when(mockedRuntimeContext.getMetricGroup()).thenReturn(metricGroup); + when(mockedRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(numberOfParallelSubtasks); + when(mockedRuntimeContext.getIndexOfThisSubtask()).thenReturn(indexOfSubtask); + when(mockedRuntimeContext.getAttemptNumber()).thenReturn(attemptNumber); + when(mockedRuntimeContext.getTaskNameWithSubtasks()).thenReturn(taskNameWithSubtask); + when(mockedRuntimeContext.getExecutionConfig()).thenReturn(executionConfig); + when(mockedRuntimeContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader); + + function.setRuntimeContext(mockedRuntimeContext); + + RuntimeContext runtimeContext = function.getRuntimeContext(); + + assertTrue(runtimeContext instanceof CepRuntimeContext); + assertEquals(taskName, runtimeContext.getTaskName()); + assertEquals(metricGroup, runtimeContext.getMetricGroup()); + assertEquals(numberOfParallelSubtasks, runtimeContext.getNumberOfParallelSubtasks()); + assertEquals(indexOfSubtask, runtimeContext.getIndexOfThisSubtask()); + assertEquals(attemptNumber, runtimeContext.getAttemptNumber()); + assertEquals(taskNameWithSubtask, runtimeContext.getTaskNameWithSubtasks()); + assertEquals(executionConfig, runtimeContext.getExecutionConfig()); + assertEquals(userCodeClassLoader, runtimeContext.getUserCodeClassLoader()); + + try { + runtimeContext.getDistributedCache(); + fail("Expected getDistributedCached to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getState(new ValueStateDescriptor<>("foobar", Integer.class, 42)); + fail("Expected getState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getListState(new ListStateDescriptor<>("foobar", Integer.class)); + fail("Expected getListState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getReducingState(new ReducingStateDescriptor<>( + "foobar", + mock(ReduceFunction.class), + Integer.class)); + fail("Expected getReducingState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getAggregatingState(new AggregatingStateDescriptor<>( + "foobar", + mock(AggregateFunction.class), + Integer.class)); + fail("Expected getAggregatingState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getFoldingState(new FoldingStateDescriptor<>( + "foobar", + 0, + mock(FoldFunction.class), + Integer.class)); + fail("Expected getFoldingState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getMapState(new MapStateDescriptor<>("foobar", Integer.class, String.class)); + fail("Expected getMapState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.addAccumulator("foobar", mock(Accumulator.class)); + fail("Expected addAccumulator to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getAccumulator("foobar"); + fail("Expected getAccumulator to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getAllAccumulators(); + fail("Expected getAllAccumulators to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getIntCounter("foobar"); + fail("Expected getIntCounter to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getLongCounter("foobar"); + fail("Expected getLongCounter to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getDoubleCounter("foobar"); + fail("Expected getDoubleCounter to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getHistogram("foobar"); + fail("Expected getHistogram to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.hasBroadcastVariable("foobar"); + fail("Expected hasBroadcastVariable to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getBroadcastVariable("foobar"); + fail("Expected getBroadcastVariable to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getBroadcastVariableWithInitializer( + "foobar", + mock(BroadcastVariableInitializer.class)); + fail("Expected getBroadcastVariableWithInitializer to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + } + + private static class TestRichIterativeCondition extends RichIterativeCondition<Integer> { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(Integer value, Context<Integer> ctx) throws Exception { + return false; + } + } + + private static class TestRichPatternSelectFunction extends RichPatternSelectFunction<Integer, Integer> { + private static final long serialVersionUID = 1L; + + @Override + public Integer select(Map<String, List<Integer>> pattern) throws Exception { + return null; + } + } + + private static class TestRichPatternFlatSelectFunction extends RichPatternFlatSelectFunction<Integer, Integer> { + private static final long serialVersionUID = 1L; + + @Override + public void flatSelect(Map<String, List<Integer>> pattern, Collector<Integer> out) throws Exception { + // no op + } + } +} diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java index 6d93ff3a3b4..743425226f2 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java @@ -21,7 +21,9 @@ import org.apache.flink.cep.Event; import org.apache.flink.cep.SubEvent; import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy; -import org.apache.flink.cep.pattern.conditions.OrCondition; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichAndCondition; +import org.apache.flink.cep.pattern.conditions.RichOrCondition; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.cep.pattern.conditions.SubtypeCondition; import org.apache.flink.util.TestLogger; @@ -33,6 +35,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; /** * Tests for constructing {@link Pattern}. @@ -102,7 +105,7 @@ public boolean filter(Event value) throws Exception { assertNotNull(pattern.getCondition()); assertNotNull(previous.getCondition()); - assertNull(previous2.getCondition()); + assertNotNull(previous2.getCondition()); assertEquals(pattern.getName(), "end"); assertEquals(previous.getName(), "next"); @@ -187,14 +190,27 @@ public boolean filter(Event value) throws Exception { assertNull(previous2.getPrevious()); assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier().getConsumingStrategy()); - assertFalse(previous.getCondition() instanceof OrCondition); - assertTrue(previous2.getCondition() instanceof OrCondition); + assertFalse(previous.getCondition() instanceof RichOrCondition); + assertTrue(previous2.getCondition() instanceof RichOrCondition); assertEquals(pattern.getName(), "end"); assertEquals(previous.getName(), "or"); assertEquals(previous2.getName(), "start"); } + @Test + public void testRichCondition() { + Pattern<Event, Event> pattern = + Pattern.<Event>begin("start") + .where(mock(IterativeCondition.class)) + .where(mock(IterativeCondition.class)) + .followedBy("end") + .where(mock(IterativeCondition.class)) + .or(mock(IterativeCondition.class)); + assertTrue(pattern.getCondition() instanceof RichOrCondition); + assertTrue(pattern.getPrevious().getCondition() instanceof RichAndCondition); + } + @Test(expected = IllegalArgumentException.class) public void testPatternTimesNegativeTimes() throws Exception { Pattern.begin("start").where(dummyCondition()).times(-1); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > ------------------------------------------------------------------ > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP > Reporter: Dian Fu > Assignee: Dian Fu > Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)