That fixed my issue. Thanks. I also agree we need to fix the Documentation On Thu, Sep 7, 2017 at 6:15 PM, Timo Walther <twal...@apache.org> wrote:
> Hi Sridhar, > > according to the exception, your "meEvents" stream is not POJO. You can > check that by printing "meEvents.getType()". In general, you can always > check the log for such problems. There should be something like: > > 14:40:57,079 INFO org.apache.flink.api.java.type > utils.TypeExtractor - class org.apache.flink.streaming.exa > mples.wordcount.WordCount$MyEvent does not contain a setter for field > responseTime > 14:40:57,083 INFO org.apache.flink.api.java.type > utils.TypeExtractor - class org.apache.flink.streaming.exa > mples.wordcount.WordCount$MyEvent is not a valid POJO type because not > all fields are valid POJO fields. > > The problem is that your setters have a return type. A POJO setter usually > should have a void return type. But I agree that this should be mentioned > in the documentation. > > Regards, > Timo > > > Am 07.09.17 um 05:20 schrieb Sridhar Chellappa: > > I am trying to use the KeyBy operator as follows : >> >> >> Pattern<MyEvent, ?> myEventsCEPPattern = >> Pattern.<MyEvent>begin("FirstEvent") >> .subtype(MyEvent.class) >> .next("SecondEvent") >> .subtype(MyEvent.class) >> .within(Time.hours(1)); >> >> >> >> PatternStream<MyEvent> myEventsPatternStream = >> CEP.pattern( >> meEvents.keyBy("field1", "field6"), >> myEventsCEPPattern >> ); >> >> >> >> When I run the program, I get the following exception: >> >> The program finished with the following exception: >> >> This type (GenericType<com.events.MyEvent>) cannot be used as key. >> org.apache.flink.api.common.operators.Keys$ExpressionKeys.< >> init>(Keys.java:330) >> org.apache.flink.streaming.api.datastream.DataStream.keyBy( >> DataStream.java:294) >> >> >> MyEvent is a POJO. What is that I am doing wrong? >> >> >> Here is the relevant code : >> >> public abstract class AbstractEvent { >> private String field1; >> private String field2; >> private String field3; >> private String field4; >> private Timestamp eventTimestmp; >> >> public AbstractEvent(String field1, String field2, String field3, >> String field4, Timestamp eventTimestmp) { >> this.field1 = field1; >> this.field2 = field2; >> this.field3 = field3; >> this.field4 = field4; >> this.eventTimestmp = eventTimestmp; >> } >> >> public AbstractEvent() { >> } >> >> public String getField1() { >> return field1; >> } >> >> public AbstractEvent setField1(String field1) { >> this.field1 = field1; >> return this; >> } >> >> public String getField2() { >> return field2; >> } >> >> public AbstractEvent setField2(String field2) { >> this.field2 = field2; >> return this; >> } >> >> public String getField3() { >> return field3; >> } >> >> public AbstractEvent setField3(String field3) { >> this.field3 = field3; >> return this; >> } >> >> public String getField4() { >> return field4; >> } >> >> public AbstractEvent setField4(String field4) { >> this.field4 = field4; >> return this; >> } >> >> public Timestamp getEventTimestmp() { >> return eventTimestmp; >> } >> >> public AbstractEvent setEventTimestmp(Timestamp eventTimestmp) { >> this.eventTimestmp = eventTimestmp; >> return this; >> } >> >> @Override >> public boolean equals(Object o) { >> if (this == o) { >> return true; >> } >> if (!(o instanceof AbstractEvent)) { >> return false; >> } >> >> AbstractEvent that = (AbstractEvent) o; >> >> if (!getField1().equals(that.getField1())) { >> return false; >> } >> if (!getField2().equals(that.getField2())) { >> return false; >> } >> if (!getField3().equals(that.getField3())) { >> return false; >> } >> if (!getField4().equals(that.getField4())) { >> return false; >> } >> return getEventTimestmp().equals(that.getEventTimestmp()); >> } >> >> @SuppressWarnings({"MagicNumber"}) >> @Override >> public int hashCode() { >> int result = getField1().hashCode(); >> result = 31 * result + getField2().hashCode(); >> result = 31 * result + getField3().hashCode(); >> result = 31 * result + getField4().hashCode(); >> result = 31 * result + getEventTimestmp().hashCode(); >> return result; >> } >> >> @Override >> public String toString() { >> return "AbstractEvent{" >> + "field1='" + field1 + '\'' >> + ", field2='" + field2 + '\'' >> + ", field3='" + field3 + '\'' >> + ", field4='" + field4 + '\'' >> + ", eventTimestmp=" + eventTimestmp >> + '}'; >> } >> } >> >> >> public class MyEvent extends AbstractEvent { >> >> private Timestamp responseTime; >> private String field5; >> private String field6; >> private int field7; >> private String field8; >> private String field9; >> >> public MyEvent(String field1, String field2, String field3, String >> field4, Timestamp eventTimestmp, >> Timestamp responseTime, >> String field5, String field6, int field7, String field8, >> String field9) { >> super(field1, field2, field3, field4, eventTimestmp); >> this.responseTime = responseTime; >> this.field5 = field5; >> this.field6 = field6; >> this.field7 = field7; >> this.field8 = field8; >> this.field9 = field9; >> } >> >> public MyEvent() { >> super(); >> } >> >> public int getField7() { >> return field7; >> } >> >> public String getField8() { >> return field8; >> } >> >> public String getField9() { >> return field9; >> } >> >> public String getField5() { >> return field5; >> } >> >> public String getField6() { >> return field6; >> } >> >> public Timestamp getResponseTime() { >> return responseTime; >> } >> >> public MyEvent setResponseTime(Timestamp responseTime) { >> this.responseTime = responseTime; >> return this; >> } >> >> public MyEvent setField7(int field7) { >> this.field7 = field7; >> return this; >> } >> >> public MyEvent setField8(String field8) { >> this.field8 = field8; >> return this; >> } >> >> public MyEvent setField9(String field9) { >> this.field9 = field9; >> return this; >> } >> >> public MyEvent setField5(String field5) { >> this.field5 = field5; >> return this; >> } >> >> public MyEvent setField6(String field6) { >> this.field6 = field6; >> return this; >> } >> >> @Override >> public boolean equals(Object o) { >> if (this == o) return true; >> if (!(o instanceof MyEvent)) return false; >> if (!super.equals(o)) return false; >> >> MyEvent that = (MyEvent) o; >> >> if (getField7() != that.getField7()) return false; >> if (!getResponseTime().equals(that.getResponseTime())) return >> false; >> if (!getField5().equals(that.getField5())) return false; >> if (!getField6().equals(that.getField6())) return false; >> if (!getField8().equals(that.getField8())) return false; >> return getField9().equals(that.getField9()); >> } >> >> @Override >> public int hashCode() { >> int result = super.hashCode(); >> result = 31 * result + getResponseTime().hashCode(); >> result = 31 * result + getField5().hashCode(); >> result = 31 * result + getField6().hashCode(); >> result = 31 * result + getField7(); >> result = 31 * result + getField8().hashCode(); >> result = 31 * result + getField9().hashCode(); >> return result; >> } >> } >> >> >> >> >> >> >> >