Can you provide us with the implementation of your Event and IoTEvent
classes?
On 15.11.2018 06:10, Steve Bistline wrote:
Any thoughts on where to start with this error would be appreciated.
Caused by: java.lang.IllegalStateException: Could not find previous entry with key: first event, value: {"DEVICE_ID":f8a395a0-d3e2-11e8-b050-9779854d8172,"TIME_STAMP":11/15/2018
02:29:30.343
am,"TEMPERATURE":0.0,"HUMIDITY":"0.0","LIGHT_WHITE":0.0,"PROCX":0.0,"MOTION_DIRECTION":0,"MOTION_SPEED":0,"MOOD_STATE":0,"VISION_PEOPLE":0,"AUDIO1":0.0}
and timestamp: 1542248971585. This can indicate that either you did not implement the equals() and hashCode() methods of your input elements properly or that the element belonging to that entry has
been already pruned.
=====================================================
CODE HERE
=====================================================
//kinesisConsumerConfig.list(System.out); // Consume the data streams
from AWS Kinesis stream DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
pt.getRequired("stream"), new EventSchema(),
kinesisConsumerConfig))
.name("Kinesis Stream Consumer"); System.out.printf("Print
dataStream\n"); //dataStream.print(); DataStream<Event> kinesisStream = dataStream
.assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
.map(event -> (IoTEvent) event); // Prints the mapped records from the Kinesis stream
//kinesisStream.print(); //System.out.printf("Print kinesisStream\n"); Pattern<Event, ?> pattern = Pattern
.<Event> begin("first event").subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>()
{
private static final long serialVersionUID = -6301755149429716724L;
@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx)throws
Exception {
PatternConstants.MOTION_FIRST = value.getMotionDir(); return
value.getMotionDir() != PatternConstants.MOTION_NA; }
})
.next("second")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
private static final long serialVersionUID
=2392863109523984059L; @Override
public boolean filter(IoTEvent value, Context<IoTEvent>
ctx)throws Exception {
return value.getMotionDir() != PatternConstants.MOTION_NA
&& value.getMotionDir() != PatternConstants.MOTION_FIRST; }
})
.next("third")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
private static final long serialVersionUID
=2392863109523984059L; @Override
public boolean filter(IoTEvent value, Context<IoTEvent>
ctx)throws Exception {
return value.getMotionDir() != PatternConstants.MOTION_NA
&& value.getMotionDir() == PatternConstants.MOTION_FIRST; }
})
.next("fourth")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
private static final long serialVersionUID
=2392863109523984059L; @Override
public boolean filter(IoTEvent value, Context<IoTEvent>
ctx)throws Exception {
return value.getMotionDir() != PatternConstants.MOTION_NA
&& value.getMotionDir() != PatternConstants.MOTION_FIRST; }
})
.within(Time.seconds(10));