Hi Vishal,

Thanks for reporting this.
There is no scheduled release for 1.4.1 yet, but I’ve just started a thread to 
track the remaining issues, so hopefully soon.
Could you quickly reply on that thread [1] also so we can more easily keep 
track of this?

Cheers,
Gordon

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-4-1-td20731.html


On 15 January 2018 at 6:49:33 PM, Vishal Santoshi (vishal.santo...@gmail.com) 
wrote:

This issues has a possible fix in 1.4.1.  

We have been waiting for a fix  
https://issues.apache.org/jira/browse/FLINK-8226 which looks a potential fix  



A simple CEP loop pattern  

private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern =  
Pattern.<Tuple2<Integer,  
SimpleBinaryEvent>>begin("start").where(checkStatusOn)  
.followedBy("middle").where(checkStatusOn).times(2)  
.next("end").where(checkStatusOn).within(Time.minutes(5))  

I see failures.  

SimpleBinaryEvent is  

public class SimpleBinaryEvent implements Serializable {  

private int id;  
private int sequence;  
private boolean status;  
private long time;  

public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {  
this.id = id;  
this.sequence = sequence;  
this.status = status;  
this.time = time;  
}  
public int getId() {  
return id;  
}  
public int getSequence() {  
return sequence;  
}  
public boolean isStatus() {  
return status;  
}  
public long getTime() {  
return time;  
}  
@Override  
public boolean equals(Object o) {  
if (this == o) return true;  
if (o == null || getClass() != o.getClass()) return false;  

SimpleBinaryEvent that = (SimpleBinaryEvent) o;  

if (getId() != that.getId()) return false;  
if (isStatus() != that.isStatus()) return false;  
if (getSequence() != that.getSequence()) return false;  
return getTime() == that.getTime();  
}  

@Override  
public int hashCode() {  
//return Objects.hash(getId(),isStatus(), getSequence(),getTime());  
int result = getId();  
result = 31 * result + (isStatus() ? 1 : 0);  
result = 31 * result + getSequence();  
result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));  
return result;  
}  

@Override  
public String toString() {  
return "SimpleBinaryEvent{" +  
"id='" + id + '\'' +  
", status=" + status +  
", sequence=" + sequence +  
", time=" + time +  
'}';  
}  

}  

failure cause:  

Caused by: java.lang.Exception: Could not materialize checkpoint 2 for  
operator KeyedCEPPatternOperator -> Map (1/1).  
... 6 more  
Caused by: java.util.concurrent.ExecutionException:  
java.lang.IllegalStateException: Could not find id for entry:  
SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1',  
status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....  

I am sure I have the equals() and hashCode() implemented the way it should  
be. I have tried the Objects.hashCode too. In other instances I have had  
CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(),  
which again points to issues with references ( equality and what not ).  
Without checkpointing turned on it works as expected. I am running on a  
local cluster. Is CEP production ready ?  

I am using 1.3.2 Flink  

Reply via email to