Hi Ishwara,

the `keyBy()` method automatically ensures that records with the same key will 
be processed by the same instance of a CoFlatMap.

As for the exception, I suppose the types `MessageType1` and `MessageType1` are 
POJOs which should follow some rules [1]. 
Also, make sure that (1) `property1` and `property2` are not arrays; (2) their 
types have overridden the `hashCode()` method [2].

Hope that helps,
Xingcan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html#rules-for-pojo-types
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html#rules-for-pojo-types>
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations>

> On May 10, 2018, at 10:43 PM, Ishwara Varnasi <ivarn...@gmail.com> wrote:
> 
> Hello,
> I am using ConnectedStream to process two different types of messages using 
> CoFlatMap. However, I would like to use keyBy on the ConnectedStream such 
> that messages with same value of certain property should always be sent to 
> same instance of CoFlatMap instance. So I've tried keyBy on ConnectedStream, 
> surprised to see that the return type is not grouped.
> 
> ConnectedStreams<MessageType1, MessageTyp2> connect = 
> myDataStream1.connect(myDataStreamOther);
> connect = connect.keyBy("property1", "property2");
> // property1 is a valid property in MessageTyp1 and property2 is a valid 
> property of MessageType2
> However, I get following exception:
> Caused by: org.apache.flink.api.common.InvalidProgramException: This type 
> (GenericType<com....MessageType1>) cannot be used as key.
> How to use keyBy with ConnectedStream and ensure that grouped messages are 
> handled by same instance of CoFlatMap?
> 
> thanks
> Ishwara Varnasi

Reply via email to