Thank you Yun. I haven't tried to follow your guide to check (would take some time for me to follow on how to do). However, I could now confirm that the "*union"* is the culprit. In my Flink Console GUI, I can see that the link from StreamC to CEP via "union" is a FORWARD link, not a HASH one, which means that having "keyBy" right before the "union" has no effect at all. If I put a placebo "map" between "keyBy" on streamC and "union" then the problem is solved (*.union(streamC.keyBy(r => (r.id1, r.id2)).map(r => r))*)
I don't know why "union" is behaving like that though. Could not find that mentioned in any document. Thanks a lot for your help. Regards, Averell On Sun, May 5, 2019 at 11:22 PM Yun Tang <myas...@live.com> wrote: > Hi Averell > > I think this is because after 'union', the input stream actually did not > follow the rule that key must be pre-partitioned in *EXACTLY* the same > way Flinkās keyBy would partition the data [1]. An easy way to verify this > is refer to [2] to filter whether different sub-task of union stream > contains exactly what down stream task conatains. > > Best > Yun Tang > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream > [2] > https://github.com/apache/flink/blob/4e505c67542a45c82c763c12099bdfc621c9e476/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java#L223 > > ------------------------------ > *From:* Averell <lvhu...@gmail.com> > *Sent:* Sunday, May 5, 2019 16:43 > *To:* user@flink.apache.org > *Subject:* IllegalArgumentException with CEP & reinterpretAsKeyedStream > > Hi everyone, > > I have a big stream A, filtered by flags from a small stream B, then > unioned > with another stream C to become the input for my CEP. > As the three streams A, B, C are all keyed, I expected that the output > stream resulting from connecting/unioning them would also be keyed, thus I > used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, > I > got the error /IllegalArgumentException/ (full stack-trace below). > If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and > use /keyBy/ manually), then there's no such exception. > > I don't know how to debug this error, and not sure whether I should use > keyed streams with CEP? > > Thanks and best regards, > Averell > > > My code: > / val cepInput = streamA.keyBy(r => (r.id1, r.id2)) > .connect(streamB.keyBy(r => (r.id1, r.id2))) > .flatMap(new MyCandidateFilterFunction()) > .union(streamC.keyBy(r => (r.id1, r.id2))) > > val cepOutput = > MyCEP(new > DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1, > r.id2)), > counter1, counter2, > threshold1, threshold2) > > object MyCEP { > def apply(input: KeyedStream[Event, _], > longPeriod: Int, > threshold: Int, > shortPeriod: Int): DataStream[Event] = { > > val patternLineIsUp = > Pattern.begin[Event]("period1") > .where((value: event, ctx: > CepContext[Event]) => accSum(_.counter, > Seq("period1"), value, ctx) < threshold) > .times(longPeriod - > shortPeriod).consecutive() > .next("period2") > .where((value: Event, ctx: > CepContext[Event]) => > accSum(_.counter, > Seq("period1", "period2"), value, ctx) < threshold > && value.status == "up") > .times(shortPeriod).consecutive() > > collectPattern(input, patternLineIsUp) > } > > private def accSum(f: Event => Long, keys: Seq[String], > currentEvent: > Event, ctx: CepContext[Event]): Long = { > keys.map(key => > ctx.getEventsForPattern(key).map(f).sum).sum + > f(currentEvent) > } > > private def collectPattern(inputStream: KeyedStream[Event, > _], pattern: > Pattern[Event, Event]): DataStream[Event] = > CEP.pattern(inputStream, pattern) > .process((map: util.Map[String, > util.List[Event]], ctx: > PatternProcessFunction.Context, collector: Collector[Event]) => { > val records = > map.get("period2") > > collector.collect(records.get(records.size() - 1)) > }) > }/ > > The exception: > /Exception in thread "main" 12:43:13,103 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped > Akka > RPC service. > org.apache.flink.runtime.client.JobExecutionException: Job execution > failed. > at > > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) > at > > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at > > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) > at com.mycompany.StreamingJob$.main(Streaming.scala:440) > at com.mycompany.StreamingJob.main(Streaming.scala) > Caused by: java.lang.IllegalArgumentException > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) > at > > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) > at > > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) > at > > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) > at > > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) > at > > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) > at > > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215) > at > > org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285) > at > > org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266) > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > / > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >