Hi, I am trying to use the KeyedStream with Tuple to handle diffrent types of Tuples including Tuple6. Keep getting the Exception: *Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead*. Is there a way around Type Erasure here ? I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.
Code below: KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null; > String keyOperationType = ....;//provided > if (StringUtils.isNotEmpty(keyOperationType)) { > if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) { > monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", > "gameId", "eventName", "component"); > } else if > (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) { > monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", > "gameId", "eventName", "component", "instance"); > } else if > (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) { > TypeInformation<Tuple6<String, String, String, String, String, > String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, > String, String, String, String>>(){}); > monitoringTupleKeyedStream = kinesisStream.keyBy(new > KeySelector<Monitoring, Tuple>() { > public Tuple getKey(Monitoring mon) throws Exception { > String key = ""; > String keyName = ""; > final String eventName = mon.getEventName(); > if (eventName != null && > ((eventName.equalsIgnoreCase(INGRESS_FPS))) > )) { > keyName = PCAM_ID; > key = mon.getEventDataMap() != null ? (String) > mon.getEventDataMap().get(PCAM_ID) : ""; > } else if (eventName != null && > (eventName.equalsIgnoreCase(EGRESS_FPS))) { > keyName = OUT_BITRATE; > key = mon.getEventDataMap() != null ? (String) > mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use > } > mon.setKeyName(keyName); > mon.setKeyValue(key); > return new Tuple6<>(mon.getDeployment(), mon.getGameId(), > eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue()); > } > }); //, info) > } else if > (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) { > monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", > "gameId", "eventName", "component", "instance", "container"); //<== this is > also a Tuple6 but no complaints ? > } > } This example below needs monitoringTupleKeyedStream to be KeyedStream<Monitoring, Tuple6<String, String, String, String, String, String>> > TypeInformation<Tuple6<String, String, String, String, String, String>> > info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, > String, String, String>>(){}); > monitoringTupleKeyedStream = kinesisStream.keyBy(new > KeySelector<Monitoring, Tuple6<String, String, String, String, String, > String>>() { > @Override > public Tuple6<String, String, String, String, String, > String> getKey(Monitoring mon) throws Exception { > String key = ""; > String keyName = ""; > //TODO: extract to a method to pull key to use > from a config file > final String eventName = mon.getEventName(); > if (eventName != null && > ((eventName.equalsIgnoreCase(INGRESS_FPS))) > )) { > keyName = PCAM_ID; > key = mon.getEventDataMap() != null ? (String) > mon.getEventDataMap().get(PCAM_ID) : ""; > } else if (eventName != null && > (eventName.equalsIgnoreCase(EGRESS_FPS))) { > keyName = OUT_BITRATE; > key = mon.getEventDataMap() != null ? (String) > mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use > } > mon.setKeyName(keyName); > mon.setKeyValue(key); > return new Tuple6<>(mon.getDeployment(), > mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), > mon.getKeyValue()); > } > }, info); TIA