Hi, We are running into errors when running multiple CEP patterns. Here’s our use-case : We are planning to build a rule based engine on top of flink with huge number of rules and doing a POC for that. For POC we have around 1000 pattern based rules which we are translating into CEP patterns and running these rules on a keyed stream of events data to detect patterns. We are partitioning the stream by orgId and each rule needs to be run into each org. Here’s the code we’ve written to implement that : /DataStream<Event> eventStream = null; DataStream<Event> partitionedInput = eventStream.keyBy((KeySelector<Event, String>) Event::getOrgid); List<Rule> ruleList = new ArrayList<>(); for (int i = 0; i < 100; i++) { ruleList.add(new Rule("rule" + i, "process1", "process2", "process3")); ruleList.add( new Rule("rule" + (i + 500), "process4", "process5", "process6")); } for (Rule rule : ruleList) { String st = rule.getStart(); String mi = rule.getMid(); String en = rule.getEnd(); String nm = rule.getName(); Pattern<Event, ?> pattern = Pattern.begin( Pattern.<Event>begin("start") .where( new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getProcess().equals(st); } }) .followedBy("middle") .where( new SimpleCondition<Event>() { @Override public boolean filter(Event event) { return !event.getProcess().equals(mi); } }) .optional() .followedBy("end") .where( new SimpleCondition<Event>() { @Override public boolean filter(Event event) { return event.getProcess().equals(en); } })); PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern); DataStream<String> alerts = patternStream.process( new PatternProcessFunction<Event, String>() { @Override public void processMatch( Map<String, List<Event>> map, Context context, Collector<String> collector) throws Exception { Event start = map.containsKey("start") ? map.get("start").get(0) : null; Event middle = map.containsKey("middle") ? map.get("middle").get(0) : null; Event end = map.containsKey("end") ? map.get("end").get(0) : null; StringJoiner joiner = new StringJoiner(","); joiner .add("Rule : " + nm + " ") .add((start == null ? "" : start.getId())) .add((middle == null ? "" : middle.getId())) .add((end == null ? "" : end.getId())); collector.collect(joiner.toString()); } }); alerts.print();/ We tried to run this code on the flink cluster with 1 task manager with 4 task slots and the task manager crashed with the error : /Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079) at org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:910) at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:623) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Invocation of public abstract java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time) timed out. at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367) at java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376) at java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079) at akka.dispatch.OnComplete.internal(Future.scala:263) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870) at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109) at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.TimeoutException: Invocation of public abstract java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time) timed out. at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60) at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:599) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@192.168.0.4:52041/user/rpc/taskmanager_0#-1397184270]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635) at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870) at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109) at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235) ... 1 more/
Can somebody help with this ? Why is this code failing ? Is out approach scalable or Is there any better way of doing this ? Considering that every CEP operator creates a thread, will this work in production with so many threads per task slot ? Does CEP library support combining multiple patterns in a single operator/thread ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/