Hi Tim,
Thanks for your reply. I am not seeing an option to specify a .returns(new
TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with
KeyedStream ??

> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> KeySelector<Monitoring, Tuple>() {
>             public Tuple getKey(Monitoring mon) throws Exception {......return
> new Tuple6<>(..}    })

I tried using
TypeInformation<Tuple6<String, String, String, String, String, String>>
info = TypeInformation.of(new TypeHint<Tuple6<String, String, String,
String, String, String>>(){});

> kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info);
> //specify typeInfo through
>

TIA,
Vijay

On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <vict...@gmail.com> wrote:

> Flink needs type information for serializing and deserializing objects,
> and that is lost due to Java type erasure.   The only way to workaround
> this is to specify the return type of the function called in the lambda.
>
> Fabian's answer here explains it well.
>
>
> https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554
>
> Tim
>
> On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> Hi,
>> I am trying to use the KeyedStream with Tuple to handle diffrent types of
>> Tuples including Tuple6.
>> Keep getting the Exception:
>> *Exception in thread "main"
>> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
>> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
>> Tuple2, etc.) instead*.
>> Is there a way around Type Erasure here ?
>> I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to
>> treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.
>>
>> Code below:
>>
>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
>>> String keyOperationType = ....;//provided
>>> if (StringUtils.isNotEmpty(keyOperationType)) {
>>>     if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
>>>         monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>>> "gameId", "eventName", "component");
>>>     } else if
>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
>>>         monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>>> "gameId", "eventName", "component", "instance");
>>>     } else if
>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
>>>         TypeInformation<Tuple6<String, String, String, String, String,
>>> String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String,
>>> String, String, String, String>>(){});
>>>         monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>> KeySelector<Monitoring, Tuple>() {
>>>             public Tuple getKey(Monitoring mon) throws Exception {
>>>                 String key = "";
>>>                 String keyName = "";
>>>                 final String eventName = mon.getEventName();
>>>                 if (eventName != null &&
>>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>>                 )) {
>>>                     keyName = PCAM_ID;
>>>                     key = mon.getEventDataMap() != null ? (String)
>>> mon.getEventDataMap().get(PCAM_ID) : "";
>>>                 } else if (eventName != null &&
>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>>                     keyName = OUT_BITRATE;
>>>                     key = mon.getEventDataMap() != null ? (String)
>>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
>>>                 }
>>>                 mon.setKeyName(keyName);
>>>                 mon.setKeyValue(key);
>>>                 return new Tuple6<>(mon.getDeployment(),
>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>>> mon.getKeyValue());
>>>             }
>>>         }); //, info)
>>>     } else if
>>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
>>>         monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>>> "gameId", "eventName", "component", "instance", "container"); //<== this is
>>> also a Tuple6 but no complaints ?
>>>     }
>>> }
>>
>>
>>
>> This example below needs monitoringTupleKeyedStream  to be
>> KeyedStream<Monitoring, Tuple6<String, String, String, String, String,
>> String>>
>>
>>> TypeInformation<Tuple6<String, String, String, String, String, String>>
>>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String,
>>> String, String, String>>(){});
>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>> KeySelector<Monitoring, Tuple6<String, String, String, String, String,
>>> String>>() {
>>>                     @Override
>>>                     public Tuple6<String, String, String, String,
>>> String, String> getKey(Monitoring mon) throws Exception {
>>>                         String key = "";
>>>                         String keyName = "";
>>>                         //TODO: extract to a method to pull key to use
>>> from a config file
>>>                         final String eventName = mon.getEventName();
>>>                         if (eventName != null &&
>>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>>                         )) {
>>>                             keyName = PCAM_ID;
>>>                             key = mon.getEventDataMap() != null ?
>>> (String) mon.getEventDataMap().get(PCAM_ID) : "";
>>>                         } else if (eventName != null &&
>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>>                             keyName = OUT_BITRATE;
>>>                             key = mon.getEventDataMap() != null ?
>>> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key
>>> to use
>>>                         }
>>>                         mon.setKeyName(keyName);
>>>                         mon.setKeyValue(key);
>>>                         return new Tuple6<>(mon.getDeployment(),
>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>>> mon.getKeyValue());
>>>                     }
>>>                 }, info);
>>
>>
>> TIA
>>
>

Reply via email to