[ 
https://issues.apache.org/jira/browse/FLINK-22803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17353512#comment-17353512
 ] 

Dawid Wysakowicz commented on FLINK-22803:
------------------------------------------

I replied in the ML. Please do not use JIRA for questions.

> Running multiple CEP patterns
> -----------------------------
>
>                 Key: FLINK-22803
>                 URL: https://issues.apache.org/jira/browse/FLINK-22803
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.13.0
>            Reporter: Tejas Budukh
>            Priority: Major
>
> Hi,
> I've tried to get help about this error on slack, user mailing list and 
> stackOverflow but with no one responding. I don't know how else to get help 
> hence creating this ticket.
> We are running into errors when running multiple CEP patterns. Here’s our
> use-case :
> We are planning to build a rule based engine on top of flink with huge
> number of rules and doing a POC for that. For POC we have around 1000
> pattern based rules which we are translating into CEP patterns and running
> these rules on a keyed stream of events data to detect patterns. We are
> partitioning the stream by orgId and each rule needs to be run into each
> org. Here’s the code we’ve written to implement that :
> _DataStream<Event> partitionedInput =_
>     _eventStream.keyBy((KeySelector<Event, String>) Event::getOrgid);_
> _List<Rule> ruleList = new ArrayList<>();_
> _for (int i = 0; i < 100; i++) {_
>   _ruleList.add(new Rule("rule" + i, "process1", "process2", "process3"));_
>   _ruleList.add(_
>       _new Rule("rule" + (i + 500), "process4", "process5", "process6"));_
> _}_
> _for (Rule rule : ruleList) {_
>   _String st = rule.getStart();_
>   _String mi = rule.getMid();_
>   _String en = rule.getEnd();_
>   _String nm = rule.getName();_
>   _Pattern<Event, ?> pattern =_
>       _Pattern.begin(_
>           _Pattern.<Event>begin("start")_
>               _.where(_
>                   _new SimpleCondition<Event>() {_
>                     _@Override_
>                     _public boolean filter(Event value) throws Exception {_
>                       _return value.getProcess().equals(st);_
>                     _}_
>                   _})_
>               _.followedBy("middle")_
>               _.where(_
>                   _new SimpleCondition<Event>() {_
>                     _@Override_
>                     _public boolean filter(Event event) {_
>                       _return !event.getProcess().equals(mi);_
>                     _}_
>                   _})_
>               _.optional()_
>               _.followedBy("end")_
>               _.where(_
>                   _new SimpleCondition<Event>() {_
>                     _@Override_
>                     _public boolean filter(Event event) {_
>                       _return event.getProcess().equals(en);_
>                     _}_
>                   _}));_
>   _PatternStream<Event> patternStream = CEP.pattern(partitionedInput,_
> _pattern);_
>   _DataStream<String> alerts =_
>       _patternStream.process(_
>           _new PatternProcessFunction<Event, String>() {_
>             _@Override_
>             _public void processMatch(_
>                 _Map<String, List&lt;Event>> map, Context context,_
> _Collector<String> collector)_
>                 _throws Exception {_
>               _Event start = map.containsKey("start") ?_
> _map.get("start").get(0) : null;_
>               _Event middle = map.containsKey("middle") ?_
> _map.get("middle").get(0) : null;_
>               _Event end = map.containsKey("end") ? map.get("end").get(0) :_
> _null;_
>               _StringJoiner joiner = new StringJoiner(",");_
>               _joiner_
>                   _.add("Rule : " + nm + " ")_
>                   _.add((start == null ? "" : start.getId()))_
>                   _.add((middle == null ? "" : middle.getId()))_
>                   _.add((end == null ? "" : end.getId()));_
>               _collector.collect(joiner.toString());_
>             _}_
>           _});_
>   _alerts.print();_
> We tried to run this code on the flink cluster with 1 task manager with 4
> task slots and the task manager crashed with the error :
> _Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by_
> _NoRestartBackoffTimeStrategy_
>         _at_
> _org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)_
>         _at_
> _org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)_
>         _at_
> _org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)_
>         _at_
> _org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)_
>         _at_
> _org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)_
>         _at_
> _org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)_
>         _at_
> _org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)_
>         _at_
> _org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)_
>         _at_
> _org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)_
>         _at_
> _org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)_
>         _at_
> _org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:910)_
>         _at_
> _org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:623)_
>         _at_
> _java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)_
>         _at_
> _java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)_
>         _at_
> _java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)_
>         _at_
> _org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)_
>         _at_
> _org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)_
>         _at_
> _org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)_
>         _at_
> _org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)_
>         _at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)_
>         _at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)_
>         _at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)_
>         _at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)_
>         _at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)_
>         _at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)_
>         _at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)_
>         _at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)_
>         _at akka.actor.Actor.aroundReceive(Actor.scala:517)_
>         _at akka.actor.Actor.aroundReceive$(Actor.scala:515)_
>         _at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)_
>         _at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)_
>         _at akka.actor.ActorCell.invoke(ActorCell.scala:561)_
>         _at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)_
>         _at akka.dispatch.Mailbox.run(Mailbox.scala:225)_
>         _at akka.dispatch.Mailbox.exec(Mailbox.scala:235)_
>         _at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)_
>         _at_
> _akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)_
>         _at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)_
>         _at_
> _akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)_
> _Caused by: java.util.concurrent.CompletionException:_
> _java.util.concurrent.TimeoutException: Invocation of public abstract_
> _java.util.concurrent.CompletableFuture_
> _org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)_
> _timed out._
>         _at_
> _java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367)_
>         _at_
> _java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376)_
>         _at_
> _java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019)_
>         _at_
> _java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)_
>         _at_
> _java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)_
>         _at_
> _org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)_
>         _at_
> _java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)_
>         _at_
> _java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)_
>         _at_
> _java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)_
>         _at_
> _java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)_
>         _at_
> _org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079)_
>         _at akka.dispatch.OnComplete.internal(Future.scala:263)_
>         _at akka.dispatch.OnComplete.internal(Future.scala:261)_
>         _at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)_
>         _at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)_
>         _at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)_
>         _at_
> _org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)_
>         _at_
> _scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)_
>         _at_
> _scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)_
>         _at_
> _scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)_
>         _at_
> _scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)_
>         _at 
> akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)_
>         _at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)_
>         _at_
> _scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)_
>         _at 
> scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)_
>         _at 
> scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)_
>         _at_
> _scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)_
>         _at_
> _akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)_
>         _at_
> _akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)_
>         _at_
> _akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)_
>         _at_
> _akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)_
>         _at java.base/java.lang.Thread.run(Thread.java:834)_
> _Caused by: java.util.concurrent.TimeoutException: Invocation of public_
> _abstract java.util.concurrent.CompletableFuture_
> _org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)_
> _timed out._
>         _at_
> _org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)_
>         _at_
> _org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:599)_
>         _at_
> _java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)_
>         _at_
> _java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)_
>         _at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)_
>         _at_
> _java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)_
>         _at_
> _java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)_
>         _at_
> _java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)_
>         _... 1 more_
> _Caused by: akka.pattern.AskTimeoutException: Ask timed out on_
> _[Actor[akka.tcp://flink@192.168.0.4:52041/user/rpc/taskmanager_0#-1397184270]]_
> _after [10000 ms]. Message of type_
> _[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical_
> _reason for `AskTimeoutException` is that the recipient actor didn't send a_
> _reply._
>         _at_
> _akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)_
>         _at 
> akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)_
>         _at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)_
>         _at_
> _scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)_
>         _at 
> scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)_
>         _at 
> scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)_
>         _at_
> _scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)_
>         _at_
> _akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)_
>         _at_
> _akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)_
>         _at_
> _akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)_
>         _at_
> _akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)_
>         _... 1 more_
>  
> Can somebody help with this ? Why is this code failing ? Is out approach
> scalable or Is there any better way of doing this ? Considering that every
> CEP operator creates a thread, will this work in production with so many
> threads per task slot ? Does CEP library support combining multiple patterns
> in a single operator/thread ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to