Hi Harshith

You can replace the GenericDataObject with Tuple3 and keyBy("A", "B")
with keyBy(1, 2) then have a try.
And you can see the doc[1] for reference also.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions
Best,
Congxian


Kumar Bolar, Harshith <hk...@arity.com> 于2019年1月29日周二 下午12:49写道:

> Typo: lines 1, 2 and 5
>
>
>
> *From: *Harshith Kumar Bolar <hk...@arity.com>
> *Date: *Tuesday, 29 January 2019 at 10:16 AM
> *To: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *KeyBy is not creating different keyed streams for different
> keys
>
>
>
> Hi all,
>
>
>
> I'm reading a simple JSON string as input and keying the stream based on
> two fields A and B. But KeyBy is generating the same keyed stream for
> different values of B but for a particular combination of A and B.
>
>
>
> The input:
>
>
>
> {
>
>     "A": "352580084349898",
>
>     "B": "1546559127",
>
>     "C": "A"
>
> }
>
>
>
> This is the core logic of my Flink code:
>
>
>
> DataStream<GenericDataObject> genericDataObjectDataStream = inputStream
>
>             .map(new MapFunction<String, GenericDataObject>() {
>
>                 @Override
>
>                 public GenericDataObject map(String s) throws Exception {
>
>                     JSONObject jsonObject = new JSONObject(s);
>
>                     GenericDataObject genericDataObject = new
> GenericDataObject();
>
>                     genericDataObject.setA(jsonObject.getString("A"));
>
>                     genericDataObject.setB(jsonObject.getString("B"));
>
>                     genericDataObject.setC(jsonObject.getString("C"));
>
>                     return genericDataObject;
>
>                 }
>
>             });
>
> DataStream<GenericDataObject> testStream = genericDataObjectDataStream
>
>             .keyBy("A", "B")
>
>             .map(new MapFunction<GenericDataObject, GenericDataObject>() {
>
>                 @Override
>
>                 public GenericDataObject map(GenericDataObject
> genericDataObject) throws Exception {
>
>                     return genericDataObject;
>
>                 }
>
>             });
>
> testStream.print();
>
>
>
> GenericDataObject is a POJO with three fields A, B and C .
>
>
>
> And this is the console output for different values of field B.
>
>
>
> 5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}
>
> 5> GenericDataObject{A='352580084349898', B='1546559127', C='A'}
>
> 4> GenericDataObject{A='352580084349898', B='1546559234', C='A'}
>
> 3> GenericDataObject{A='352580084349898', B='1546559254', C='A'}
>
> 5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}
>
>
>
> Notice lines 1, 2 and 3. Even though they have different values of B, they
> are being put in the same keyed stream (5). I must be doing something
> fundamentally wrong here.
>
>
>
> Thanks,
>
> Harshith
>
>
>

Reply via email to