Hi, Which version of flink do you use? I would recommend upgrading to 1.6.2 for which we reworked internal structures and such problem should not occur. Best,
Dawid On 19/11/2018 08:30, Chesnay Schepler wrote: > @klou @dawid What is the most likely cause if this is not caused by an > improper equals()/hashCode() implementation? > > On 16.11.2018 19:39, Steve Bistline wrote: >> Implemented hashcode() on both the DEVICE_ID and the MOTION_DIRECTION >> ( the pattern is built around this one ). Still giving me the >> following error: >> >> >> java.lang.RuntimeException: Exception occurred while processing valve output >> watermark: >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) >> at >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) >> at >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.IllegalStateException: Could not find previous entry >> with key: first event, value: >> {"DEVICE_ID":7435b060-d2fb-11e8-8da5-9779854d8172,"TIME_STAMP":11/16/2018 >> 06:34:33.994 >> pm,"TEMPERATURE":0.0,"HUMIDITY":"0.0","LIGHT_WHITE":0.0,"PROCX":0.0,"MOTION_DIRECTION":0,"MOTION_SPEED":0,"MOOD_STATE":0,"VISION_PEOPLE":0,"AUDIO1":0.0} >> and timestamp: 1542393274928. This can indicate that either you did not >> implement the equals() and hashCode() methods of your input elements >> properly or that the element belonging to that entry has been already pruned. >> at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107) >> at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566) >> at org.apache.flink.cep.nfa.NFA.process(NFA.java:252) >> at >> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332) >> at >> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235) >> at >> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) >> at >> java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590) >> at >> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234) >> at >> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) >> at >> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) >> ... 7 more >> >> On Thu, Nov 15, 2018 at 8:13 AM Chesnay Schepler <ches...@apache.org >> <mailto:ches...@apache.org>> wrote: >> >> Does the issue persist if you implement hashCode() in IoTEvent >> like this: >> >> @Override public int hashCode() { >> return this.DEVICE_ID.hashCode(); } >> >> >> On 15.11.2018 13:58, Steve Bistline wrote: >>> Sure, here it is. >>> >>> Thank you so very much for having a look at this. >>> >>> Steve >>> >>> >>> On Thu, Nov 15, 2018 at 4:18 AM Chesnay Schepler >>> <ches...@apache.org <mailto:ches...@apache.org>> wrote: >>> >>> Can you provide us with the implementation of your Event and >>> IoTEvent classes? >>> >>> On 15.11.2018 06:10, Steve Bistline wrote: >>>> Any thoughts on where to start with this error would be >>>> appreciated. >>>> >>>> Caused by: java.lang.IllegalStateException: Could not find >>>> previous entry with key: first event, value: >>>> {"DEVICE_ID":f8a395a0-d3e2-11e8-b050-9779854d8172,"TIME_STAMP":11/15/2018 >>>> 02:29:30.343 >>>> am,"TEMPERATURE":0.0,"HUMIDITY":"0.0","LIGHT_WHITE":0.0,"PROCX":0.0,"MOTION_DIRECTION":0,"MOTION_SPEED":0,"MOOD_STATE":0,"VISION_PEOPLE":0,"AUDIO1":0.0} >>>> and timestamp: 1542248971585. This can indicate that either you did not >>>> implement the equals() and hashCode() methods of your input elements >>>> properly or that the element belonging to that entry has been already >>>> pruned. >>>> ===================================================== >>>> CODE HERE >>>> ===================================================== >>>> //kinesisConsumerConfig.list(System.out); // Consume the >>>> data streams from AWS Kinesis stream DataStream<Event> dataStream >>>> = env.addSource(new FlinkKinesisConsumer<>( >>>> pt.getRequired("stream"), new EventSchema(), >>>> kinesisConsumerConfig)) >>>> .name("Kinesis Stream Consumer"); >>>> System.out.printf("Print dataStream\n"); //dataStream.print(); >>>> DataStream<Event> kinesisStream = dataStream >>>> .assignTimestampsAndWatermarks(new >>>> TimeLagWatermarkGenerator()) >>>> .map(event -> (IoTEvent) event); // Prints the >>>> mapped records from the Kinesis stream >>>> //kinesisStream.print(); //System.out.printf("Print >>>> kinesisStream\n"); Pattern<Event, ?> pattern = Pattern >>>> .<Event> begin("first >>>> event").subtype(IoTEvent.class) >>>> .where(new IterativeCondition<IoTEvent>() >>>> { >>>> private static final long serialVersionUID = >>>> -6301755149429716724L; @Override >>>> public boolean filter(IoTEvent value, Context<IoTEvent> >>>> ctx) throws Exception { >>>> PatternConstants.MOTION_FIRST = >>>> value.getMotionDir(); return value.getMotionDir() != >>>> PatternConstants.MOTION_NA; } >>>> }) >>>> .next("second") >>>> .subtype(IoTEvent.class) >>>> .where(new IterativeCondition<IoTEvent>() { >>>> private static final long serialVersionUID = >>>> 2392863109523984059L; @Override >>>> public boolean filter(IoTEvent value, >>>> Context<IoTEvent> ctx) throws Exception { >>>> >>>> return value.getMotionDir() != >>>> PatternConstants.MOTION_NA && value.getMotionDir() != >>>> PatternConstants.MOTION_FIRST; } >>>> }) >>>> .next("third") >>>> .subtype(IoTEvent.class) >>>> .where(new IterativeCondition<IoTEvent>() { >>>> private static final long serialVersionUID = >>>> 2392863109523984059L; @Override >>>> public boolean filter(IoTEvent value, >>>> Context<IoTEvent> ctx) throws Exception { >>>> >>>> return value.getMotionDir() != >>>> PatternConstants.MOTION_NA && value.getMotionDir() == >>>> PatternConstants.MOTION_FIRST; } >>>> }) >>>> .next("fourth") >>>> .subtype(IoTEvent.class) >>>> .where(new IterativeCondition<IoTEvent>() { >>>> private static final long serialVersionUID = >>>> 2392863109523984059L; @Override >>>> public boolean filter(IoTEvent value, >>>> Context<IoTEvent> ctx) throws Exception { >>>> >>>> return value.getMotionDir() != >>>> PatternConstants.MOTION_NA && value.getMotionDir() != >>>> PatternConstants.MOTION_FIRST; } >>>> }) >>>> .within(Time.seconds(10)); >>> >>> >> >
signature.asc
Description: OpenPGP digital signature