Could this just be solved by creating a POJO model class for your problem?

That is, instead of using Tuple6 - create a class that encapsulates your
data.   This, I think, would solve your problem.  But beyond that I think
the code will be more understandable.  It's hard to have a Tuple6 of all
Strings, and remember what each one means -- even if I wrote the code :-)
Furthermore, if and when you need to add more elements to your data model,
you will need to refactor your entire Flink graph.   Keeping a data model
in POJO protects against those things.

The latter is just unsolicited code review feedback.   And I know I gave it
without much context to your problem.  So please take with a large grain of
salt, and if it doesn't apply just ignore it.

Tim


On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler <ches...@apache.org> wrote:

> > I tried using  [ keyBy(KeySelector, TypeInformation) ]
>
> What was the result of this approach?
>
> On 03/04/2019 17:36, Vijay Balakrishnan wrote:
>
> Hi Tim,
> Thanks for your reply. I am not seeing an option to specify a .returns(new
> TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with
> KeyedStream ??
>
>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>> KeySelector<Monitoring, Tuple>() {
>>             public Tuple getKey(Monitoring mon) throws Exception 
>> {......return
>> new Tuple6<>(..}    })
>
> I tried using
> TypeInformation<Tuple6<String, String, String, String, String, String>>
> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String,
> String, String, String>>(){});
>
>> kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info);
>> //specify typeInfo through
>>
>
> TIA,
> Vijay
>
> On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <vict...@gmail.com> wrote:
>
>> Flink needs type information for serializing and deserializing objects,
>> and that is lost due to Java type erasure.   The only way to workaround
>> this is to specify the return type of the function called in the lambda.
>>
>> Fabian's answer here explains it well.
>>
>>
>> https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554
>>
>> Tim
>>
>> On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <bvija...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am trying to use the KeyedStream with Tuple to handle diffrent types
>>> of Tuples including Tuple6.
>>> Keep getting the Exception:
>>> *Exception in thread "main"
>>> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
>>> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
>>> Tuple2, etc.) instead*.
>>> Is there a way around Type Erasure here ?
>>> I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to
>>> treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.
>>>
>>> Code below:
>>>
>>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
>>>> String keyOperationType = ....;//provided
>>>> if (StringUtils.isNotEmpty(keyOperationType)) {
>>>>     if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
>>>>         monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>>>> "gameId", "eventName", "component");
>>>>     } else if
>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
>>>>         monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>>>> "gameId", "eventName", "component", "instance");
>>>>     } else if
>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
>>>>         TypeInformation<Tuple6<String, String, String, String, String,
>>>> String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String,
>>>> String, String, String, String>>(){});
>>>>         monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>> KeySelector<Monitoring, Tuple>() {
>>>>             public Tuple getKey(Monitoring mon) throws Exception {
>>>>                 String key = "";
>>>>                 String keyName = "";
>>>>                 final String eventName = mon.getEventName();
>>>>                 if (eventName != null &&
>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>>>                 )) {
>>>>                     keyName = PCAM_ID;
>>>>                     key = mon.getEventDataMap() != null ? (String)
>>>> mon.getEventDataMap().get(PCAM_ID) : "";
>>>>                 } else if (eventName != null &&
>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>>>                     keyName = OUT_BITRATE;
>>>>                     key = mon.getEventDataMap() != null ? (String)
>>>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
>>>>                 }
>>>>                 mon.setKeyName(keyName);
>>>>                 mon.setKeyValue(key);
>>>>                 return new Tuple6<>(mon.getDeployment(),
>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>>>> mon.getKeyValue());
>>>>             }
>>>>         }); //, info)
>>>>     } else if
>>>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
>>>>         monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>>>> "gameId", "eventName", "component", "instance", "container"); //<== this is
>>>> also a Tuple6 but no complaints ?
>>>>     }
>>>> }
>>>
>>>
>>>
>>> This example below needs monitoringTupleKeyedStream  to be
>>> KeyedStream<Monitoring, Tuple6<String, String, String, String, String,
>>> String>>
>>>
>>>> TypeInformation<Tuple6<String, String, String, String, String, String>>
>>>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String,
>>>> String, String, String>>(){});
>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>> KeySelector<Monitoring, Tuple6<String, String, String, String, String,
>>>> String>>() {
>>>>                     @Override
>>>>                     public Tuple6<String, String, String, String,
>>>> String, String> getKey(Monitoring mon) throws Exception {
>>>>                         String key = "";
>>>>                         String keyName = "";
>>>>                         //TODO: extract to a method to pull key to use
>>>> from a config file
>>>>                         final String eventName = mon.getEventName();
>>>>                         if (eventName != null &&
>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>>>                         )) {
>>>>                             keyName = PCAM_ID;
>>>>                             key = mon.getEventDataMap() != null ?
>>>> (String) mon.getEventDataMap().get(PCAM_ID) : "";
>>>>                         } else if (eventName != null &&
>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>>>                             keyName = OUT_BITRATE;
>>>>                             key = mon.getEventDataMap() != null ?
>>>> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key
>>>> to use
>>>>                         }
>>>>                         mon.setKeyName(keyName);
>>>>                         mon.setKeyValue(key);
>>>>                         return new Tuple6<>(mon.getDeployment(),
>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>>>> mon.getKeyValue());
>>>>                     }
>>>>                 }, info);
>>>
>>>
>>> TIA
>>>
>>
>

Reply via email to