Hello,
I am using ConnectedStream to process two different types of messages using
CoFlatMap. However, I would like to use keyBy on the ConnectedStream such
that messages with same value of certain property should always be sent to
same instance of CoFlatMap instance. So I've tried keyBy on
ConnectedStream, surprised to see that the return type is not grouped.

ConnectedStreams<MessageType1, MessageTyp2> connect =
myDataStream1.connect(myDataStreamOther);
connect = connect.keyBy("property1", "property2");

// property1 is a valid property in MessageTyp1 and property2 is a
valid property of MessageType2

However, I get following exception:

Caused by: org.apache.flink.api.common.InvalidProgramException: This
type (GenericType<com....MessageType1>) cannot be used as key.

How to use keyBy with ConnectedStream and ensure that grouped messages are
handled by same instance of CoFlatMap?

thanks
Ishwara Varnasi

Reply via email to