Hi Vishal, I think it might be due to this bug: https://issues.apache.org/jira/browse/FLINK-8226 It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? Would be really helpful. If the error still persists could you file a jira?
Regards Dawid > On 11 Jan 2018, at 19:49, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > > When checkpointing is turned on a simple CEP loop pattern > > private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = > Pattern.<Tuple2<Integer, > SimpleBinaryEvent>>begin("start").where(checkStatusOn) > .followedBy("middle").where(checkStatusOn).times(2) > .next("end").where(checkStatusOn).within(Time.minutes(5)) > > I see failures. > > SimpleBinaryEvent is > > public class SimpleBinaryEvent implements Serializable { > > private int id; > private int sequence; > private boolean status; > private long time; > > public SimpleBinaryEvent(int id, int sequence, boolean status , long time) { > > this.id > = id; > this.sequence = sequence; > this.status = status; > this.time = time; > } > public int getId() { > return id; > } > public int getSequence() { > return sequence; > } > public boolean isStatus() { > return status; > } > public long getTime() { > return time; > } > @Override > public boolean equals(Object o) { > if (this == o) return true; > if (o == null || getClass() != o.getClass()) return false; > > SimpleBinaryEvent that = (SimpleBinaryEvent) o; > > if (getId() != that.getId()) return false; > if (isStatus() != that.isStatus()) return false; > if (getSequence() != that.getSequence()) return false; > return getTime() == that.getTime(); > } > > @Override > public int hashCode() { > //return Objects.hash(getId(),isStatus(), getSequence(),getTime()); > int result = getId(); > result = 31 * result + (isStatus() ? 1 : 0); > result = 31 * result + getSequence(); > result = 31 * result + (int) (getTime() ^ (getTime() >>> 32)); > return result; > } > > @Override > public String toString() { > return "SimpleBinaryEvent{" + > "id='" + id + '\'' + > ", status=" + status + > ", sequence=" + sequence + > ", time=" + time + > '}'; > } > > } > > failure cause: > > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for > operator KeyedCEPPatternOperator -> Map (1/1). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException: Could not find id for entry: > SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, > sequence=95, time=1505503380000}), 1505503380000, 0),.... > > I am sure I have the equals() and hashCode() implemented the way it should > be. I have tried the Objects.hashCode too. In other instances I have had > CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), > which again points to issues with references ( equality and what not ). > Without checkpointing turned on it works as expected. I am running on a local > cluster. Is CEP production ready ? > > I am using 1.3.2 Flink >
signature.asc
Description: Message signed with OpenPGP