Hi,
Which version of flink do you use? I would recommend upgrading to 1.6.2
for which we reworked internal structures and such problem should not occur.
Best,

Dawid


On 19/11/2018 08:30, Chesnay Schepler wrote:
> @klou @dawid What is the most likely cause if this is not caused by an
> improper equals()/hashCode() implementation?
>
> On 16.11.2018 19:39, Steve Bistline wrote:
>> Implemented hashcode() on both the DEVICE_ID and the MOTION_DIRECTION
>> ( the pattern is built around this one ). Still giving me the
>> following error:
>>
>>
>> java.lang.RuntimeException: Exception occurred while processing valve output 
>> watermark: 
>>      at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>>      at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>      at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>      at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>      at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalStateException: Could not find previous entry 
>> with key: first event, value: 
>> {"DEVICE_ID":7435b060-d2fb-11e8-8da5-9779854d8172,"TIME_STAMP":11/16/2018 
>> 06:34:33.994 
>> pm,"TEMPERATURE":0.0,"HUMIDITY":"0.0","LIGHT_WHITE":0.0,"PROCX":0.0,"MOTION_DIRECTION":0,"MOTION_SPEED":0,"MOOD_STATE":0,"VISION_PEOPLE":0,"AUDIO1":0.0}
>>  and timestamp: 1542393274928. This can indicate that either you did not 
>> implement the equals() and hashCode() methods of your input elements 
>> properly or that the element belonging to that entry has been already pruned.
>>      at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107)
>>      at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566)
>>      at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
>>      at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
>>      at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
>>      at 
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>      at 
>> java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
>>      at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
>>      at 
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>>      at 
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
>>      at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>>      ... 7 more
>>
>> On Thu, Nov 15, 2018 at 8:13 AM Chesnay Schepler <ches...@apache.org
>> <mailto:ches...@apache.org>> wrote:
>>
>>     Does the issue persist if you implement hashCode() in IoTEvent
>>     like this:
>>
>>     @Override public int hashCode() {
>>        return this.DEVICE_ID.hashCode(); }
>>
>>
>>     On 15.11.2018 13:58, Steve Bistline wrote:
>>>     Sure, here it is.  
>>>
>>>     Thank you so very much for having a look at this.
>>>
>>>     Steve
>>>
>>>
>>>     On Thu, Nov 15, 2018 at 4:18 AM Chesnay Schepler
>>>     <ches...@apache.org <mailto:ches...@apache.org>> wrote:
>>>
>>>         Can you provide us with the implementation of your Event and
>>>         IoTEvent classes?
>>>
>>>         On 15.11.2018 06:10, Steve Bistline wrote:
>>>>         Any thoughts on where to start with this error would be
>>>>         appreciated.
>>>>
>>>>         Caused by: java.lang.IllegalStateException: Could not find 
>>>> previous entry with key: first event, value: 
>>>> {"DEVICE_ID":f8a395a0-d3e2-11e8-b050-9779854d8172,"TIME_STAMP":11/15/2018 
>>>> 02:29:30.343 
>>>> am,"TEMPERATURE":0.0,"HUMIDITY":"0.0","LIGHT_WHITE":0.0,"PROCX":0.0,"MOTION_DIRECTION":0,"MOTION_SPEED":0,"MOOD_STATE":0,"VISION_PEOPLE":0,"AUDIO1":0.0}
>>>>  and timestamp: 1542248971585. This can indicate that either you did not 
>>>> implement the equals() and hashCode() methods of your input elements 
>>>> properly or that the element belonging to that entry has been already 
>>>> pruned.
>>>>         =====================================================
>>>>         CODE HERE
>>>>         =====================================================
>>>>         //kinesisConsumerConfig.list(System.out); // Consume the
>>>>         data streams from AWS Kinesis stream DataStream<Event> dataStream 
>>>> = env.addSource(new FlinkKinesisConsumer<>(
>>>>                        pt.getRequired("stream"), new EventSchema(), 
>>>> kinesisConsumerConfig))
>>>>                        .name("Kinesis Stream Consumer"); 
>>>> System.out.printf("Print dataStream\n"); //dataStream.print(); 
>>>> DataStream<Event> kinesisStream = dataStream
>>>>                        .assignTimestampsAndWatermarks(new 
>>>> TimeLagWatermarkGenerator())
>>>>                        .map(event -> (IoTEvent) event); // Prints the 
>>>> mapped records from the Kinesis stream
>>>>         //kinesisStream.print(); //System.out.printf("Print
>>>>         kinesisStream\n"); Pattern<Event, ?> pattern = Pattern
>>>>                        .<Event> begin("first 
>>>> event").subtype(IoTEvent.class)
>>>>                        .where(new IterativeCondition<IoTEvent>()
>>>>                {
>>>>                    private static final long serialVersionUID = 
>>>> -6301755149429716724L; @Override
>>>>                    public boolean filter(IoTEvent value, Context<IoTEvent> 
>>>> ctx) throws Exception {
>>>>                        PatternConstants.MOTION_FIRST = 
>>>> value.getMotionDir(); return value.getMotionDir() != 
>>>> PatternConstants.MOTION_NA; }
>>>>                })
>>>>                        .next("second")
>>>>                        .subtype(IoTEvent.class)
>>>>                        .where(new IterativeCondition<IoTEvent>() {
>>>>                            private static final long serialVersionUID = 
>>>> 2392863109523984059L; @Override
>>>>                            public boolean filter(IoTEvent value, 
>>>> Context<IoTEvent> ctx) throws Exception {
>>>>
>>>>                                return value.getMotionDir() != 
>>>> PatternConstants.MOTION_NA && value.getMotionDir() != 
>>>> PatternConstants.MOTION_FIRST; }
>>>>                        })
>>>>                        .next("third")
>>>>                        .subtype(IoTEvent.class)
>>>>                        .where(new IterativeCondition<IoTEvent>() {
>>>>                            private static final long serialVersionUID = 
>>>> 2392863109523984059L; @Override
>>>>                            public boolean filter(IoTEvent value, 
>>>> Context<IoTEvent> ctx) throws Exception {
>>>>
>>>>                                return value.getMotionDir() != 
>>>> PatternConstants.MOTION_NA && value.getMotionDir() == 
>>>> PatternConstants.MOTION_FIRST; }
>>>>                        })
>>>>                        .next("fourth")
>>>>                        .subtype(IoTEvent.class)
>>>>                        .where(new IterativeCondition<IoTEvent>() {
>>>>                            private static final long serialVersionUID = 
>>>> 2392863109523984059L; @Override
>>>>                            public boolean filter(IoTEvent value, 
>>>> Context<IoTEvent> ctx) throws Exception {
>>>>
>>>>                                return value.getMotionDir() != 
>>>> PatternConstants.MOTION_NA && value.getMotionDir() != 
>>>> PatternConstants.MOTION_FIRST; }
>>>>                        })
>>>>                        .within(Time.seconds(10));
>>>
>>>
>>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to