Hi Harshith You can replace the GenericDataObject with Tuple3 and keyBy("A", "B") with keyBy(1, 2) then have a try. And you can see the doc[1] for reference also.
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions Best, Congxian Kumar Bolar, Harshith <hk...@arity.com> 于2019年1月29日周二 下午12:49写道: > Typo: lines 1, 2 and 5 > > > > *From: *Harshith Kumar Bolar <hk...@arity.com> > *Date: *Tuesday, 29 January 2019 at 10:16 AM > *To: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *KeyBy is not creating different keyed streams for different > keys > > > > Hi all, > > > > I'm reading a simple JSON string as input and keying the stream based on > two fields A and B. But KeyBy is generating the same keyed stream for > different values of B but for a particular combination of A and B. > > > > The input: > > > > { > > "A": "352580084349898", > > "B": "1546559127", > > "C": "A" > > } > > > > This is the core logic of my Flink code: > > > > DataStream<GenericDataObject> genericDataObjectDataStream = inputStream > > .map(new MapFunction<String, GenericDataObject>() { > > @Override > > public GenericDataObject map(String s) throws Exception { > > JSONObject jsonObject = new JSONObject(s); > > GenericDataObject genericDataObject = new > GenericDataObject(); > > genericDataObject.setA(jsonObject.getString("A")); > > genericDataObject.setB(jsonObject.getString("B")); > > genericDataObject.setC(jsonObject.getString("C")); > > return genericDataObject; > > } > > }); > > DataStream<GenericDataObject> testStream = genericDataObjectDataStream > > .keyBy("A", "B") > > .map(new MapFunction<GenericDataObject, GenericDataObject>() { > > @Override > > public GenericDataObject map(GenericDataObject > genericDataObject) throws Exception { > > return genericDataObject; > > } > > }); > > testStream.print(); > > > > GenericDataObject is a POJO with three fields A, B and C . > > > > And this is the console output for different values of field B. > > > > 5> GenericDataObject{A='352580084349898', B='1546559224', C='A'} > > 5> GenericDataObject{A='352580084349898', B='1546559127', C='A'} > > 4> GenericDataObject{A='352580084349898', B='1546559234', C='A'} > > 3> GenericDataObject{A='352580084349898', B='1546559254', C='A'} > > 5> GenericDataObject{A='352580084349898', B='1546559224', C='A'} > > > > Notice lines 1, 2 and 3. Even though they have different values of B, they > are being put in the same keyed stream (5). I must be doing something > fundamentally wrong here. > > > > Thanks, > > Harshith > > >