[ https://issues.apache.org/jira/browse/FLINK-22803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17353512#comment-17353512 ]
Dawid Wysakowicz commented on FLINK-22803: ------------------------------------------ I replied in the ML. Please do not use JIRA for questions. > Running multiple CEP patterns > ----------------------------- > > Key: FLINK-22803 > URL: https://issues.apache.org/jira/browse/FLINK-22803 > Project: Flink > Issue Type: Bug > Components: Library / CEP > Affects Versions: 1.13.0 > Reporter: Tejas Budukh > Priority: Major > > Hi, > I've tried to get help about this error on slack, user mailing list and > stackOverflow but with no one responding. I don't know how else to get help > hence creating this ticket. > We are running into errors when running multiple CEP patterns. Here’s our > use-case : > We are planning to build a rule based engine on top of flink with huge > number of rules and doing a POC for that. For POC we have around 1000 > pattern based rules which we are translating into CEP patterns and running > these rules on a keyed stream of events data to detect patterns. We are > partitioning the stream by orgId and each rule needs to be run into each > org. Here’s the code we’ve written to implement that : > _DataStream<Event> partitionedInput =_ > _eventStream.keyBy((KeySelector<Event, String>) Event::getOrgid);_ > _List<Rule> ruleList = new ArrayList<>();_ > _for (int i = 0; i < 100; i++) {_ > _ruleList.add(new Rule("rule" + i, "process1", "process2", "process3"));_ > _ruleList.add(_ > _new Rule("rule" + (i + 500), "process4", "process5", "process6"));_ > _}_ > _for (Rule rule : ruleList) {_ > _String st = rule.getStart();_ > _String mi = rule.getMid();_ > _String en = rule.getEnd();_ > _String nm = rule.getName();_ > _Pattern<Event, ?> pattern =_ > _Pattern.begin(_ > _Pattern.<Event>begin("start")_ > _.where(_ > _new SimpleCondition<Event>() {_ > _@Override_ > _public boolean filter(Event value) throws Exception {_ > _return value.getProcess().equals(st);_ > _}_ > _})_ > _.followedBy("middle")_ > _.where(_ > _new SimpleCondition<Event>() {_ > _@Override_ > _public boolean filter(Event event) {_ > _return !event.getProcess().equals(mi);_ > _}_ > _})_ > _.optional()_ > _.followedBy("end")_ > _.where(_ > _new SimpleCondition<Event>() {_ > _@Override_ > _public boolean filter(Event event) {_ > _return event.getProcess().equals(en);_ > _}_ > _}));_ > _PatternStream<Event> patternStream = CEP.pattern(partitionedInput,_ > _pattern);_ > _DataStream<String> alerts =_ > _patternStream.process(_ > _new PatternProcessFunction<Event, String>() {_ > _@Override_ > _public void processMatch(_ > _Map<String, List<Event>> map, Context context,_ > _Collector<String> collector)_ > _throws Exception {_ > _Event start = map.containsKey("start") ?_ > _map.get("start").get(0) : null;_ > _Event middle = map.containsKey("middle") ?_ > _map.get("middle").get(0) : null;_ > _Event end = map.containsKey("end") ? map.get("end").get(0) :_ > _null;_ > _StringJoiner joiner = new StringJoiner(",");_ > _joiner_ > _.add("Rule : " + nm + " ")_ > _.add((start == null ? "" : start.getId()))_ > _.add((middle == null ? "" : middle.getId()))_ > _.add((end == null ? "" : end.getId()));_ > _collector.collect(joiner.toString());_ > _}_ > _});_ > _alerts.print();_ > We tried to run this code on the flink cluster with 1 task manager with 4 > task slots and the task manager crashed with the error : > _Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by_ > _NoRestartBackoffTimeStrategy_ > _at_ > _org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)_ > _at_ > _org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)_ > _at_ > _org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)_ > _at_ > _org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)_ > _at_ > _org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)_ > _at_ > _org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)_ > _at_ > _org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)_ > _at_ > _org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)_ > _at_ > _org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)_ > _at_ > _org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)_ > _at_ > _org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:910)_ > _at_ > _org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:623)_ > _at_ > _java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)_ > _at_ > _java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)_ > _at_ > _java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)_ > _at_ > _org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)_ > _at_ > _org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)_ > _at_ > _org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)_ > _at_ > _org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)_ > _at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)_ > _at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)_ > _at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)_ > _at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)_ > _at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)_ > _at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)_ > _at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)_ > _at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)_ > _at akka.actor.Actor.aroundReceive(Actor.scala:517)_ > _at akka.actor.Actor.aroundReceive$(Actor.scala:515)_ > _at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)_ > _at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)_ > _at akka.actor.ActorCell.invoke(ActorCell.scala:561)_ > _at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)_ > _at akka.dispatch.Mailbox.run(Mailbox.scala:225)_ > _at akka.dispatch.Mailbox.exec(Mailbox.scala:235)_ > _at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)_ > _at_ > _akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)_ > _at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)_ > _at_ > _akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)_ > _Caused by: java.util.concurrent.CompletionException:_ > _java.util.concurrent.TimeoutException: Invocation of public abstract_ > _java.util.concurrent.CompletableFuture_ > _org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)_ > _timed out._ > _at_ > _java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367)_ > _at_ > _java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376)_ > _at_ > _java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019)_ > _at_ > _java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)_ > _at_ > _java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)_ > _at_ > _org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)_ > _at_ > _java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)_ > _at_ > _java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)_ > _at_ > _java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)_ > _at_ > _java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)_ > _at_ > _org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079)_ > _at akka.dispatch.OnComplete.internal(Future.scala:263)_ > _at akka.dispatch.OnComplete.internal(Future.scala:261)_ > _at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)_ > _at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)_ > _at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)_ > _at_ > _org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)_ > _at_ > _scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)_ > _at_ > _scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)_ > _at_ > _scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)_ > _at_ > _scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)_ > _at > akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)_ > _at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)_ > _at_ > _scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)_ > _at > scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)_ > _at > scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)_ > _at_ > _scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)_ > _at_ > _akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)_ > _at_ > _akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)_ > _at_ > _akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)_ > _at_ > _akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)_ > _at java.base/java.lang.Thread.run(Thread.java:834)_ > _Caused by: java.util.concurrent.TimeoutException: Invocation of public_ > _abstract java.util.concurrent.CompletableFuture_ > _org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)_ > _timed out._ > _at_ > _org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)_ > _at_ > _org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:599)_ > _at_ > _java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)_ > _at_ > _java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)_ > _at > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)_ > _at_ > _java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)_ > _at_ > _java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)_ > _at_ > _java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)_ > _... 1 more_ > _Caused by: akka.pattern.AskTimeoutException: Ask timed out on_ > _[Actor[akka.tcp://flink@192.168.0.4:52041/user/rpc/taskmanager_0#-1397184270]]_ > _after [10000 ms]. Message of type_ > _[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical_ > _reason for `AskTimeoutException` is that the recipient actor didn't send a_ > _reply._ > _at_ > _akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)_ > _at > akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)_ > _at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)_ > _at_ > _scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)_ > _at > scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)_ > _at > scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)_ > _at_ > _scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)_ > _at_ > _akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)_ > _at_ > _akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)_ > _at_ > _akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)_ > _at_ > _akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)_ > _... 1 more_ > > Can somebody help with this ? Why is this code failing ? Is out approach > scalable or Is there any better way of doing this ? Considering that every > CEP operator creates a thread, will this work in production with so many > threads per task slot ? Does CEP library support combining multiple patterns > in a single operator/thread ? -- This message was sent by Atlassian Jira (v8.3.4#803005)