I have a DataStream on which I am applying a CEP pattern and grouping the results using keyby(). The DataStream Object is a pojo :
public class DataStreamObject { private String field1; private String field2; public DataStreamObject(String field1, String field2) { this.field1 = field1; this.field2 = field2; } public void setField1(String field1) { this.field1 = field1; } public String getField1() { return field1; } public void setField2(String field2) { this.field2 = field2; } public String getField2() { return field2; } @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof DataStreamObject)) return false; DataStreamObject that = (DataStreamObject) o; if (!getField1().equals(that.getField1())) return false; return getField2().equals(that.getField2()); } @Override public int hashCode() { int result = getField1().hashCode(); result = 31 * result + getField2().hashCode(); return result; } @Override public String toString() { return "DriverSameAsCustomer{" + "field1='" + field1 + '\'' + ", field2='" + field2 + '\'' + '}'; } } When I submit my flinkjob, I get the following error : This type (GenericType<com.foo.DataStreamObject>) cannot be used as key. org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330) org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294) com.foo.Main.main(Main.java:66) As I understand, I do not need to implement Key interface if the class is a POJO (which it is). Please help me understand where I am going wrong an suggest a fix.