Have tested against the 1.5 SNAPShot ( I simply pulled  the source code
into my distribution and compiled it into my job jar ). Both the test code
and the cluster seems to work ok. Have not tested the "savepoint  and
resume" mode but restore from checkpoint works. I brought the JM down and
restarted it.  I have to sanitize the output but at least the exception is
not thrown.

One thing though and please confirm

In CEP it seems that a POJO pushed into the window as part of Pattern match
has to have an  "exact" equals/hashcode.  As in in my case I had a custom
equals/hashcode for enabling "contains" for a different context as in I had
deliberately not included an instance variable in the equals/hashcode
contract. Is that a design decision or a requirement in CEP ?

Thanks and Regards.




On Sun, Jan 14, 2018 at 12:27 PM, Vishal Santoshi <vishal.santo...@gmail.com
> wrote:

> Will do.
>
> On Sun, Jan 14, 2018 at 10:23 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> We don't have a schedule for bugfix releases but do them based on need.
>> AFAIK, a discussion about a 1.4.1 release has not been started yet.
>>
>> Would you like to kick that off by sending a mail to the dev mailing list?
>>
>>
>> 2018-01-12 16:41 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>
>>> Thanks.  We will.
>>>
>>>    When is 1.4.1 scheduled for release ?
>>>
>>> On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz <
>>> wysakowicz.da...@gmail.com> wrote:
>>>
>>>> Hi Vishal,
>>>> I think it might be due to this bug: https://issues.apache.org/jira
>>>> /browse/FLINK-8226
>>>> It was merged for 1.4.1 and 1.5.0. Could you check with this changes
>>>> applied? Would be really helpful. If the error still persists could you
>>>> file a jira?
>>>>
>>>> Regards
>>>> Dawid
>>>>
>>>> > On 11 Jan 2018, at 19:49, Vishal Santoshi <vishal.santo...@gmail.com>
>>>> wrote:
>>>> >
>>>> > When checkpointing is turned on a simple CEP loop pattern
>>>> >
>>>> >  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern
>>>> = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("star
>>>> t").where(checkStatusOn)
>>>> >         .followedBy("middle").where(checkStatusOn).times(2)
>>>> >         .next("end").where(checkStatusOn).within(Time.minutes(5))
>>>> >
>>>> > I see failures.
>>>> >
>>>> > SimpleBinaryEvent is
>>>> >
>>>> > public class SimpleBinaryEvent implements Serializable {
>>>> >
>>>> > private int id;
>>>> > private int sequence;
>>>> > private boolean status;
>>>> > private long time;
>>>> >
>>>> > public SimpleBinaryEvent(int id, int sequence, boolean status , long
>>>> time) {
>>>> >
>>>> > this.id
>>>> >  = id;
>>>> >     this.sequence = sequence;
>>>> >     this.status = status;
>>>> >     this.time = time;
>>>> > }
>>>> > public int getId() {
>>>> >     return id;
>>>> > }
>>>> > public int getSequence() {
>>>> >     return sequence;
>>>> > }
>>>> > public boolean isStatus() {
>>>> >     return status;
>>>> > }
>>>> > public long getTime() {
>>>> >     return time;
>>>> > }
>>>> > @Override
>>>> > public boolean equals(Object o) {
>>>> >     if (this == o) return true;
>>>> >     if (o == null || getClass() != o.getClass()) return false;
>>>> >
>>>> >     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
>>>> >
>>>> >     if (getId() != that.getId()) return false;
>>>> >     if (isStatus() != that.isStatus()) return false;
>>>> >     if (getSequence() != that.getSequence()) return false;
>>>> >     return getTime() == that.getTime();
>>>> > }
>>>> >
>>>> > @Override
>>>> > public int hashCode() {
>>>> >     //return Objects.hash(getId(),isStatus(),
>>>> getSequence(),getTime());
>>>> >     int result = getId();
>>>> >     result = 31 * result + (isStatus() ? 1 : 0);
>>>> >     result = 31 * result + getSequence();
>>>> >     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
>>>> >     return result;
>>>> > }
>>>> >
>>>> > @Override
>>>> > public String toString() {
>>>> >     return "SimpleBinaryEvent{" +
>>>> >             "id='" + id + '\'' +
>>>> >             ", status=" + status +
>>>> >             ", sequence=" + sequence +
>>>> >             ", time=" + time +
>>>> >             '}';
>>>> > }
>>>> >
>>>> > }
>>>> >
>>>> > failure cause:
>>>> >
>>>> > Caused by: java.lang.Exception: Could not materialize checkpoint 2
>>>> for operator KeyedCEPPatternOperator -> Map (1/1).
>>>> > ... 6 more
>>>> > Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.IllegalStateException: Could not find id for entry:
>>>> SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1',
>>>> status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
>>>> >
>>>> > I am sure I have the equals() and hashCode() implemented the way it
>>>> should be. I have tried the Objects.hashCode too. In other instances I have
>>>> had CircularReference ( and thus stackOverflow ) on
>>>> SharedBuffer.toString(), which again points to issues with references (
>>>> equality and what not ). Without checkpointing turned on it works as
>>>> expected. I am running on a local cluster. Is CEP production ready ?
>>>> >
>>>> > I am using 1.3.2 Flink
>>>> >
>>>>
>>>>
>>>
>>
>

Reply via email to