Hi all, I'm reading a simple JSON string as input and keying the stream based on two fields A and B. But KeyBy is generating the same keyed stream for different values of B but for a particular combination of A and B.
The input: { "A": "352580084349898", "B": "1546559127", "C": "A" } This is the core logic of my Flink code: DataStream<GenericDataObject> genericDataObjectDataStream = inputStream .map(new MapFunction<String, GenericDataObject>() { @Override public GenericDataObject map(String s) throws Exception { JSONObject jsonObject = new JSONObject(s); GenericDataObject genericDataObject = new GenericDataObject(); genericDataObject.setA(jsonObject.getString("A")); genericDataObject.setB(jsonObject.getString("B")); genericDataObject.setC(jsonObject.getString("C")); return genericDataObject; } }); DataStream<GenericDataObject> testStream = genericDataObjectDataStream .keyBy("A", "B") .map(new MapFunction<GenericDataObject, GenericDataObject>() { @Override public GenericDataObject map(GenericDataObject genericDataObject) throws Exception { return genericDataObject; } }); testStream.print(); GenericDataObject is a POJO with three fields A, B and C . And this is the console output for different values of field B. 5> GenericDataObject{A='352580084349898', B='1546559224', C='A'} 5> GenericDataObject{A='352580084349898', B='1546559127', C='A'} 4> GenericDataObject{A='352580084349898', B='1546559234', C='A'} 3> GenericDataObject{A='352580084349898', B='1546559254', C='A'} 5> GenericDataObject{A='352580084349898', B='1546559224', C='A'} Notice lines 1, 2 and 3. Even though they have different values of B, they are being put in the same keyed stream (5). I must be doing something fundamentally wrong here. Thanks, Harshith