Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of
Tuples including Tuple6.
Keep getting the Exception:
*Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
Tuple2, etc.) instead*.
Is there a way around Type Erasure here ?
I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to
treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
> String keyOperationType = ....;//provided
> if (StringUtils.isNotEmpty(keyOperationType)) {
>     if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
>         monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
> "gameId", "eventName", "component");
>     } else if
> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
>         monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
> "gameId", "eventName", "component", "instance");
>     } else if
> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
>         TypeInformation<Tuple6<String, String, String, String, String,
> String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String,
> String, String, String, String>>(){});
>         monitoringTupleKeyedStream = kinesisStream.keyBy(new
> KeySelector<Monitoring, Tuple>() {
>             public Tuple getKey(Monitoring mon) throws Exception {
>                 String key = "";
>                 String keyName = "";
>                 final String eventName = mon.getEventName();
>                 if (eventName != null &&
> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>                 )) {
>                     keyName = PCAM_ID;
>                     key = mon.getEventDataMap() != null ? (String)
> mon.getEventDataMap().get(PCAM_ID) : "";
>                 } else if (eventName != null &&
> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>                     keyName = OUT_BITRATE;
>                     key = mon.getEventDataMap() != null ? (String)
> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
>                 }
>                 mon.setKeyName(keyName);
>                 mon.setKeyValue(key);
>                 return new Tuple6<>(mon.getDeployment(), mon.getGameId(),
> eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
>             }
>         }); //, info)
>     } else if
> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
>         monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
> "gameId", "eventName", "component", "instance", "container"); //<== this is
> also a Tuple6 but no complaints ?
>     }
> }



This example below needs monitoringTupleKeyedStream  to be
KeyedStream<Monitoring, Tuple6<String, String, String, String, String,
String>>

> TypeInformation<Tuple6<String, String, String, String, String, String>>
> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String,
> String, String, String>>(){});
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> KeySelector<Monitoring, Tuple6<String, String, String, String, String,
> String>>() {
>                     @Override
>                     public Tuple6<String, String, String, String, String,
> String> getKey(Monitoring mon) throws Exception {
>                         String key = "";
>                         String keyName = "";
>                         //TODO: extract to a method to pull key to use
> from a config file
>                         final String eventName = mon.getEventName();
>                         if (eventName != null &&
> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>                         )) {
>                             keyName = PCAM_ID;
>                             key = mon.getEventDataMap() != null ? (String)
> mon.getEventDataMap().get(PCAM_ID) : "";
>                         } else if (eventName != null &&
> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>                             keyName = OUT_BITRATE;
>                             key = mon.getEventDataMap() != null ? (String)
> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
>                         }
>                         mon.setKeyName(keyName);
>                         mon.setKeyValue(key);
>                         return new Tuple6<>(mon.getDeployment(),
> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
> mon.getKeyValue());
>                     }
>                 }, info);


TIA

Reply via email to