Thx for all your replies. Solved the problem by skirting the issue. I
pre-populated the incoming Monitoring Object on intake with the dynamic
runtime fields keyName and keyValue and that way, I could use the static
call as used in all the other if conditions:
  monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId",
"eventName", "component", "keyName","keyValue");

The reason, I want to use Tuple was because I was passing this
KeyedStream<Monitoring,
Tuple> to a common method that could handle the Tuple accordingly.

I tried using  [ keyBy(KeySelector, TypeInformation) ] but the compiler
complained that I need to use Monitoring, Tuple6 in that particular case.

Vijay

On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma <abhioncbr.apa...@gmail.com>
wrote:

> I agree with Timothy, POJO would be a much better approach.
>
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Abhishek
>
> On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor <vict...@gmail.com> wrote:
>
>> Could this just be solved by creating a POJO model class for your problem?
>>
>> That is, instead of using Tuple6 - create a class that encapsulates your
>> data.   This, I think, would solve your problem.  But beyond that I think
>> the code will be more understandable.  It's hard to have a Tuple6 of all
>> Strings, and remember what each one means -- even if I wrote the code :-)
>> Furthermore, if and when you need to add more elements to your data model,
>> you will need to refactor your entire Flink graph.   Keeping a data model
>> in POJO protects against those things.
>>
>> The latter is just unsolicited code review feedback.   And I know I gave
>> it without much context to your problem.  So please take with a large grain
>> of salt, and if it doesn't apply just ignore it.
>>
>> Tim
>>
>>
>> On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> > I tried using  [ keyBy(KeySelector, TypeInformation) ]
>>>
>>> What was the result of this approach?
>>>
>>> On 03/04/2019 17:36, Vijay Balakrishnan wrote:
>>>
>>> Hi Tim,
>>> Thanks for your reply. I am not seeing an option to specify a
>>> .returns(new TypeHint<Tuple6<String,
>>> String,String,String,String,String>>(){}) with KeyedStream ??
>>>
>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>> KeySelector<Monitoring, Tuple>() {
>>>>             public Tuple getKey(Monitoring mon) throws Exception 
>>>> {......return
>>>> new Tuple6<>(..}    })
>>>
>>> I tried using
>>> TypeInformation<Tuple6<String, String, String, String, String, String>>
>>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String,
>>> String, String, String>>(){});
>>>
>>>> kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info);
>>>> //specify typeInfo through
>>>>
>>>
>>> TIA,
>>> Vijay
>>>
>>> On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <vict...@gmail.com> wrote:
>>>
>>>> Flink needs type information for serializing and deserializing objects,
>>>> and that is lost due to Java type erasure.   The only way to workaround
>>>> this is to specify the return type of the function called in the lambda.
>>>>
>>>> Fabian's answer here explains it well.
>>>>
>>>>
>>>> https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554
>>>>
>>>> Tim
>>>>
>>>> On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <bvija...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I am trying to use the KeyedStream with Tuple to handle diffrent types
>>>>> of Tuples including Tuple6.
>>>>> Keep getting the Exception:
>>>>> *Exception in thread "main"
>>>>> org.apache.flink.api.common.functions.InvalidTypesException: Usage of 
>>>>> class
>>>>> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
>>>>> Tuple2, etc.) instead*.
>>>>> Is there a way around Type Erasure here ?
>>>>> I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on
>>>>> to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.
>>>>>
>>>>> Code below:
>>>>>
>>>>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
>>>>>> String keyOperationType = ....;//provided
>>>>>> if (StringUtils.isNotEmpty(keyOperationType)) {
>>>>>>     if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION))
>>>>>> {
>>>>>>         monitoringTupleKeyedStream =
>>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
>>>>>>     } else if
>>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
>>>>>>         monitoringTupleKeyedStream =
>>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component",
>>>>>> "instance");
>>>>>>     } else if
>>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
>>>>>>         TypeInformation<Tuple6<String, String, String, String,
>>>>>> String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String,
>>>>>> String, String, String, String, String>>(){});
>>>>>>         monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>>>> KeySelector<Monitoring, Tuple>() {
>>>>>>             public Tuple getKey(Monitoring mon) throws Exception {
>>>>>>                 String key = "";
>>>>>>                 String keyName = "";
>>>>>>                 final String eventName = mon.getEventName();
>>>>>>                 if (eventName != null &&
>>>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>>>>>                 )) {
>>>>>>                     keyName = PCAM_ID;
>>>>>>                     key = mon.getEventDataMap() != null ? (String)
>>>>>> mon.getEventDataMap().get(PCAM_ID) : "";
>>>>>>                 } else if (eventName != null &&
>>>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>>>>>                     keyName = OUT_BITRATE;
>>>>>>                     key = mon.getEventDataMap() != null ? (String)
>>>>>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
>>>>>>                 }
>>>>>>                 mon.setKeyName(keyName);
>>>>>>                 mon.setKeyValue(key);
>>>>>>                 return new Tuple6<>(mon.getDeployment(),
>>>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>>>>>> mon.getKeyValue());
>>>>>>             }
>>>>>>         }); //, info)
>>>>>>     } else if
>>>>>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
>>>>>>         monitoringTupleKeyedStream =
>>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component",
>>>>>> "instance", "container"); //<== this is also a Tuple6 but no complaints ?
>>>>>>     }
>>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> This example below needs monitoringTupleKeyedStream  to be
>>>>> KeyedStream<Monitoring, Tuple6<String, String, String, String, String,
>>>>> String>>
>>>>>
>>>>>> TypeInformation<Tuple6<String, String, String, String, String,
>>>>>> String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String,
>>>>>> String, String, String, String>>(){});
>>>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>>>> KeySelector<Monitoring, Tuple6<String, String, String, String, String,
>>>>>> String>>() {
>>>>>>                     @Override
>>>>>>                     public Tuple6<String, String, String, String,
>>>>>> String, String> getKey(Monitoring mon) throws Exception {
>>>>>>                         String key = "";
>>>>>>                         String keyName = "";
>>>>>>                         //TODO: extract to a method to pull key to
>>>>>> use from a config file
>>>>>>                         final String eventName = mon.getEventName();
>>>>>>                         if (eventName != null &&
>>>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>>>>>                         )) {
>>>>>>                             keyName = PCAM_ID;
>>>>>>                             key = mon.getEventDataMap() != null ?
>>>>>> (String) mon.getEventDataMap().get(PCAM_ID) : "";
>>>>>>                         } else if (eventName != null &&
>>>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>>>>>                             keyName = OUT_BITRATE;
>>>>>>                             key = mon.getEventDataMap() != null ?
>>>>>> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify 
>>>>>> key
>>>>>> to use
>>>>>>                         }
>>>>>>                         mon.setKeyName(keyName);
>>>>>>                         mon.setKeyValue(key);
>>>>>>                         return new Tuple6<>(mon.getDeployment(),
>>>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>>>>>> mon.getKeyValue());
>>>>>>                     }
>>>>>>                 }, info);
>>>>>
>>>>>
>>>>> TIA
>>>>>
>>>>
>>>

Reply via email to