Thomas Wozniakowski created FLINK-10960: -------------------------------------------
Summary: CEP: Job Failure when .times(2) is used Key: FLINK-10960 URL: https://issues.apache.org/jira/browse/FLINK-10960 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.6.2 Reporter: Thomas Wozniakowski Hi Guys, Encountered a strange one today. We use the CEP library in a configurable way where we plug a config file into the Flink Job JAR and it programmatically sets up a bunch of CEP operators matching the config file. I encountered a strange bug when I was testing with some artificially low numbers in our testing environment today. The CEP code we're using (modified slightly) is: {{ Pattern.begin(EVENT_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent()) .times(config.getNumberOfUniqueEvents()) .where(uniquenessCheckOnAlreadyMatchedEvents()) .within(seconds(config.getWithinSeconds())); }} When using the {{ numberOfUniqueEvents: 2 }}, I started seeing the following error killing the job whenever a match was detected: {quote} ava.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkRuntimeException: State eventSequence:2 does not exist in the NFA. NFA has states [Final State $endState$ [ ]), Normal State eventSequence [ StateTransition(TAKE, from eventSequenceto $endState$, with condition), StateTransition(IGNORE, from eventSequenceto purchaseSequence, with condition), ]), Start State purchaseSequence:0 [ StateTransition(TAKE, from eventSequence:0to purchaseSequence, with condition), ])] at org.apache.flink.cep.nfa.NFA.isStartState(NFA.java:144) at org.apache.flink.cep.nfa.NFA.isStateTimedOut(NFA.java:270) at org.apache.flink.cep.nfa.NFA.advanceTime(NFA.java:244) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.advanceTime(AbstractKeyedCEPPatternOperator.java:389) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:293) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) {quote} Changing the config to {{ numberOfUniqueEvents: 3 }} fixed the problem. Changing it back to 2 brought the problem back. It seems to be specifically related to the value of 2. This is not a blocking issue for me because we typically use much higher numbers than this in production anyway, but I figured you guys might want to know about this issue. Let me know if you need any more information. Tom -- This message was sent by Atlassian JIRA (v7.6.3#76005)