Hi Ishwara, the `keyBy()` method automatically ensures that records with the same key will be processed by the same instance of a CoFlatMap.
As for the exception, I suppose the types `MessageType1` and `MessageType1` are POJOs which should follow some rules [1]. Also, make sure that (1) `property1` and `property2` are not arrays; (2) their types have overridden the `hashCode()` method [2]. Hope that helps, Xingcan [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html#rules-for-pojo-types <https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html#rules-for-pojo-types> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations> > On May 10, 2018, at 10:43 PM, Ishwara Varnasi <ivarn...@gmail.com> wrote: > > Hello, > I am using ConnectedStream to process two different types of messages using > CoFlatMap. However, I would like to use keyBy on the ConnectedStream such > that messages with same value of certain property should always be sent to > same instance of CoFlatMap instance. So I've tried keyBy on ConnectedStream, > surprised to see that the return type is not grouped. > > ConnectedStreams<MessageType1, MessageTyp2> connect = > myDataStream1.connect(myDataStreamOther); > connect = connect.keyBy("property1", "property2"); > // property1 is a valid property in MessageTyp1 and property2 is a valid > property of MessageType2 > However, I get following exception: > Caused by: org.apache.flink.api.common.InvalidProgramException: This type > (GenericType<com....MessageType1>) cannot be used as key. > How to use keyBy with ConnectedStream and ensure that grouped messages are > handled by same instance of CoFlatMap? > > thanks > Ishwara Varnasi