I am trying to use the KeyBy operator as follows :
Pattern<MyEvent, ?> myEventsCEPPattern = Pattern.<MyEvent>begin("FirstEvent") .subtype(MyEvent.class) .next("SecondEvent") .subtype(MyEvent.class) .within(Time.hours(1)); PatternStream<MyEvent> myEventsPatternStream = CEP.pattern( meEvents.keyBy("field1", "field6"), myEventsCEPPattern ); When I run the program, I get the following exception: The program finished with the following exception: This type (GenericType<com.events.MyEvent>) cannot be used as key. org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330) org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294) MyEvent is a POJO. What is that I am doing wrong? Here is the relevant code : public abstract class AbstractEvent { private String field1; private String field2; private String field3; private String field4; private Timestamp eventTimestmp; public AbstractEvent(String field1, String field2, String field3, String field4, Timestamp eventTimestmp) { this.field1 = field1; this.field2 = field2; this.field3 = field3; this.field4 = field4; this.eventTimestmp = eventTimestmp; } public AbstractEvent() { } public String getField1() { return field1; } public AbstractEvent setField1(String field1) { this.field1 = field1; return this; } public String getField2() { return field2; } public AbstractEvent setField2(String field2) { this.field2 = field2; return this; } public String getField3() { return field3; } public AbstractEvent setField3(String field3) { this.field3 = field3; return this; } public String getField4() { return field4; } public AbstractEvent setField4(String field4) { this.field4 = field4; return this; } public Timestamp getEventTimestmp() { return eventTimestmp; } public AbstractEvent setEventTimestmp(Timestamp eventTimestmp) { this.eventTimestmp = eventTimestmp; return this; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (!(o instanceof AbstractEvent)) { return false; } AbstractEvent that = (AbstractEvent) o; if (!getField1().equals(that.getField1())) { return false; } if (!getField2().equals(that.getField2())) { return false; } if (!getField3().equals(that.getField3())) { return false; } if (!getField4().equals(that.getField4())) { return false; } return getEventTimestmp().equals(that.getEventTimestmp()); } @SuppressWarnings({"MagicNumber"}) @Override public int hashCode() { int result = getField1().hashCode(); result = 31 * result + getField2().hashCode(); result = 31 * result + getField3().hashCode(); result = 31 * result + getField4().hashCode(); result = 31 * result + getEventTimestmp().hashCode(); return result; } @Override public String toString() { return "AbstractEvent{" + "field1='" + field1 + '\'' + ", field2='" + field2 + '\'' + ", field3='" + field3 + '\'' + ", field4='" + field4 + '\'' + ", eventTimestmp=" + eventTimestmp + '}'; } } public class MyEvent extends AbstractEvent { private Timestamp responseTime; private String field5; private String field6; private int field7; private String field8; private String field9; public MyEvent(String field1, String field2, String field3, String field4, Timestamp eventTimestmp, Timestamp responseTime, String field5, String field6, int field7, String field8, String field9) { super(field1, field2, field3, field4, eventTimestmp); this.responseTime = responseTime; this.field5 = field5; this.field6 = field6; this.field7 = field7; this.field8 = field8; this.field9 = field9; } public MyEvent() { super(); } public int getField7() { return field7; } public String getField8() { return field8; } public String getField9() { return field9; } public String getField5() { return field5; } public String getField6() { return field6; } public Timestamp getResponseTime() { return responseTime; } public MyEvent setResponseTime(Timestamp responseTime) { this.responseTime = responseTime; return this; } public MyEvent setField7(int field7) { this.field7 = field7; return this; } public MyEvent setField8(String field8) { this.field8 = field8; return this; } public MyEvent setField9(String field9) { this.field9 = field9; return this; } public MyEvent setField5(String field5) { this.field5 = field5; return this; } public MyEvent setField6(String field6) { this.field6 = field6; return this; } @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof MyEvent)) return false; if (!super.equals(o)) return false; MyEvent that = (MyEvent) o; if (getField7() != that.getField7()) return false; if (!getResponseTime().equals(that.getResponseTime())) return false; if (!getField5().equals(that.getField5())) return false; if (!getField6().equals(that.getField6())) return false; if (!getField8().equals(that.getField8())) return false; return getField9().equals(that.getField9()); } @Override public int hashCode() { int result = super.hashCode(); result = 31 * result + getResponseTime().hashCode(); result = 31 * result + getField5().hashCode(); result = 31 * result + getField6().hashCode(); result = 31 * result + getField7(); result = 31 * result + getField8().hashCode(); result = 31 * result + getField9().hashCode(); return result; } }