Hi everyone, I have a big stream A, filtered by flags from a small stream B, then unioned with another stream C to become the input for my CEP. As the three streams A, B, C are all keyed, I expected that the output stream resulting from connecting/unioning them would also be keyed, thus I used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I got the error /IllegalArgumentException/ (full stack-trace below). If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and use /keyBy/ manually), then there's no such exception.
I don't know how to debug this error, and not sure whether I should use keyed streams with CEP? Thanks and best regards, Averell My code: / val cepInput = streamA.keyBy(r => (r.id1, r.id2)) .connect(streamB.keyBy(r => (r.id1, r.id2))) .flatMap(new MyCandidateFilterFunction()) .union(streamC.keyBy(r => (r.id1, r.id2))) val cepOutput = MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1, r.id2)), counter1, counter2, threshold1, threshold2) object MyCEP { def apply(input: KeyedStream[Event, _], longPeriod: Int, threshold: Int, shortPeriod: Int): DataStream[Event] = { val patternLineIsUp = Pattern.begin[Event]("period1") .where((value: event, ctx: CepContext[Event]) => accSum(_.counter, Seq("period1"), value, ctx) < threshold) .times(longPeriod - shortPeriod).consecutive() .next("period2") .where((value: Event, ctx: CepContext[Event]) => accSum(_.counter, Seq("period1", "period2"), value, ctx) < threshold && value.status == "up") .times(shortPeriod).consecutive() collectPattern(input, patternLineIsUp) } private def accSum(f: Event => Long, keys: Seq[String], currentEvent: Event, ctx: CepContext[Event]): Long = { keys.map(key => ctx.getEventsForPattern(key).map(f).sum).sum + f(currentEvent) } private def collectPattern(inputStream: KeyedStream[Event, _], pattern: Pattern[Event, Event]): DataStream[Event] = CEP.pattern(inputStream, pattern) .process((map: util.Map[String, util.List[Event]], ctx: PatternProcessFunction.Context, collector: Collector[Event]) => { val records = map.get("period2") collector.collect(records.get(records.size() - 1)) }) }/ The exception: /Exception in thread "main" 12:43:13,103 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service. org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at com.mycompany.StreamingJob$.main(Streaming.scala:440) at com.mycompany.StreamingJob.main(Streaming.scala) Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215) at org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285) at org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) / -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/