I am trying to use the KeyBy operator as follows :

    Pattern<MyEvent, ?> myEventsCEPPattern =
                    Pattern.<MyEvent>begin("FirstEvent")
                            .subtype(MyEvent.class)
                            .next("SecondEvent")
                            .subtype(MyEvent.class)
                            .within(Time.hours(1));



        PatternStream<MyEvent> myEventsPatternStream =
                CEP.pattern(
                        meEvents.keyBy("field1", "field6"),
                        myEventsCEPPattern
                );



When I run the program, I get the following exception:

The program finished with the following exception:

This type (GenericType<com.events.MyEvent>) cannot be used as key.

org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)

org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294)


MyEvent is a POJO. What is that I am doing wrong?


Here is the relevant code :

public abstract class AbstractEvent {
    private String field1;
    private String field2;
    private String field3;
    private String field4;
    private Timestamp eventTimestmp;

    public AbstractEvent(String field1, String field2, String field3,
String field4, Timestamp eventTimestmp) {
        this.field1 = field1;
        this.field2 = field2;
        this.field3 = field3;
        this.field4 = field4;
        this.eventTimestmp = eventTimestmp;
    }

    public AbstractEvent() {
    }

    public String getField1() {
        return field1;
    }

    public AbstractEvent setField1(String field1) {
        this.field1 = field1;
        return this;
    }

    public String getField2() {
        return field2;
    }

    public AbstractEvent setField2(String field2) {
        this.field2 = field2;
        return this;
    }

    public String getField3() {
        return field3;
    }

    public AbstractEvent setField3(String field3) {
        this.field3 = field3;
        return this;
    }

    public String getField4() {
        return field4;
    }

    public AbstractEvent setField4(String field4) {
        this.field4 = field4;
        return this;
    }

    public Timestamp getEventTimestmp() {
        return eventTimestmp;
    }

    public AbstractEvent setEventTimestmp(Timestamp eventTimestmp) {
        this.eventTimestmp = eventTimestmp;
        return this;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof AbstractEvent)) {
            return false;
        }

        AbstractEvent that = (AbstractEvent) o;

        if (!getField1().equals(that.getField1())) {
            return false;
        }
        if (!getField2().equals(that.getField2())) {
            return false;
        }
        if (!getField3().equals(that.getField3())) {
            return false;
        }
        if (!getField4().equals(that.getField4())) {
            return false;
        }
        return getEventTimestmp().equals(that.getEventTimestmp());
    }

    @SuppressWarnings({"MagicNumber"})
    @Override
    public int hashCode() {
        int result = getField1().hashCode();
        result = 31 * result + getField2().hashCode();
        result = 31 * result + getField3().hashCode();
        result = 31 * result + getField4().hashCode();
        result = 31 * result + getEventTimestmp().hashCode();
        return result;
    }

    @Override
    public String toString() {
        return "AbstractEvent{"
                + "field1='" + field1 + '\''
                + ", field2='" + field2 + '\''
                + ", field3='" + field3 + '\''
                + ", field4='" + field4 + '\''
                + ", eventTimestmp=" + eventTimestmp
                + '}';
    }
}


public class MyEvent extends AbstractEvent {

    private Timestamp responseTime;
    private String field5;
    private String field6;
    private int field7;
    private String field8;
    private String field9;

    public MyEvent(String field1, String field2, String field3, String
field4, Timestamp eventTimestmp,
                                           Timestamp responseTime, String
field5, String field6, int field7, String field8,
                                           String field9) {
        super(field1, field2, field3, field4, eventTimestmp);
        this.responseTime = responseTime;
        this.field5 = field5;
        this.field6 = field6;
        this.field7 = field7;
        this.field8 = field8;
        this.field9 = field9;
    }

    public MyEvent() {
        super();
    }

    public int getField7() {
        return field7;
    }

    public String getField8() {
        return field8;
    }

    public String getField9() {
        return field9;
    }

    public String getField5() {
        return field5;
    }

    public String getField6() {
        return field6;
    }

    public Timestamp getResponseTime() {
        return responseTime;
    }

    public MyEvent setResponseTime(Timestamp responseTime) {
        this.responseTime = responseTime;
        return this;
    }

    public MyEvent setField7(int field7) {
        this.field7 = field7;
        return this;
    }

    public MyEvent setField8(String field8) {
        this.field8 = field8;
        return this;
    }

    public MyEvent setField9(String field9) {
        this.field9 = field9;
        return this;
    }

    public MyEvent setField5(String field5) {
        this.field5 = field5;
        return this;
    }

    public MyEvent setField6(String field6) {
        this.field6 = field6;
        return this;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof MyEvent)) return false;
        if (!super.equals(o)) return false;

        MyEvent that = (MyEvent) o;

        if (getField7() != that.getField7()) return false;
        if (!getResponseTime().equals(that.getResponseTime())) return false;
        if (!getField5().equals(that.getField5())) return false;
        if (!getField6().equals(that.getField6())) return false;
        if (!getField8().equals(that.getField8())) return false;
        return getField9().equals(that.getField9());
    }

    @Override
    public int hashCode() {
        int result = super.hashCode();
        result = 31 * result + getResponseTime().hashCode();
        result = 31 * result + getField5().hashCode();
        result = 31 * result + getField6().hashCode();
        result = 31 * result + getField7();
        result = 31 * result + getField8().hashCode();
        result = 31 * result + getField9().hashCode();
        return result;
    }
}

Reply via email to