When checkpointing is turned on a simple CEP loop pattern private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn) .followedBy("middle").where(checkStatusOn).times(2) .next("end").where(checkStatusOn).within(Time.minutes(5))
I see failures. SimpleBinaryEvent is public class SimpleBinaryEvent implements Serializable { private int id; private int sequence; private boolean status; private long time; public SimpleBinaryEvent(int id, int sequence, boolean status , long time) { this.id = id; this.sequence = sequence; this.status = status; this.time = time; } public int getId() { return id; } public int getSequence() { return sequence; } public boolean isStatus() { return status; } public long getTime() { return time; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SimpleBinaryEvent that = (SimpleBinaryEvent) o; if (getId() != that.getId()) return false; if (isStatus() != that.isStatus()) return false; if (getSequence() != that.getSequence()) return false; return getTime() == that.getTime(); } @Override public int hashCode() { //return Objects.hash(getId(),isStatus(), getSequence(),getTime()); int result = getId(); result = 31 * result + (isStatus() ? 1 : 0); result = 31 * result + getSequence(); result = 31 * result + (int) (getTime() ^ (getTime() >>> 32)); return result; } @Override public String toString() { return "SimpleBinaryEvent{" + "id='" + id + '\'' + ", status=" + status + ", sequence=" + sequence + ", time=" + time + '}'; } } failure cause: Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),.... I am sure I have the equals() and hashCode() implemented the way it should be. I have tried the Objects.hashCode too. In other instances I have had CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), which again points to issues with references ( equality and what not ). Without checkpointing turned on it works as expected. I am running on a local cluster. Is CEP production ready ? I am using 1.3.2 Flink