Hi Tim, Thanks for your reply. I am not seeing an option to specify a .returns(new TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with KeyedStream ??
> monitoringTupleKeyedStream = kinesisStream.keyBy(new > KeySelector<Monitoring, Tuple>() { > public Tuple getKey(Monitoring mon) throws Exception {......return > new Tuple6<>(..} }) I tried using TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){}); > kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info); > //specify typeInfo through > TIA, Vijay On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <vict...@gmail.com> wrote: > Flink needs type information for serializing and deserializing objects, > and that is lost due to Java type erasure. The only way to workaround > this is to specify the return type of the function called in the lambda. > > Fabian's answer here explains it well. > > > https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554 > > Tim > > On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Hi, >> I am trying to use the KeyedStream with Tuple to handle diffrent types of >> Tuples including Tuple6. >> Keep getting the Exception: >> *Exception in thread "main" >> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class >> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, >> Tuple2, etc.) instead*. >> Is there a way around Type Erasure here ? >> I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to >> treat Tuple6 as a Tuple like the monitoringTupleKeyedStream. >> >> Code below: >> >> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null; >>> String keyOperationType = ....;//provided >>> if (StringUtils.isNotEmpty(keyOperationType)) { >>> if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) { >>> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", >>> "gameId", "eventName", "component"); >>> } else if >>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) { >>> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", >>> "gameId", "eventName", "component", "instance"); >>> } else if >>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) { >>> TypeInformation<Tuple6<String, String, String, String, String, >>> String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, >>> String, String, String, String>>(){}); >>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>> KeySelector<Monitoring, Tuple>() { >>> public Tuple getKey(Monitoring mon) throws Exception { >>> String key = ""; >>> String keyName = ""; >>> final String eventName = mon.getEventName(); >>> if (eventName != null && >>> ((eventName.equalsIgnoreCase(INGRESS_FPS))) >>> )) { >>> keyName = PCAM_ID; >>> key = mon.getEventDataMap() != null ? (String) >>> mon.getEventDataMap().get(PCAM_ID) : ""; >>> } else if (eventName != null && >>> (eventName.equalsIgnoreCase(EGRESS_FPS))) { >>> keyName = OUT_BITRATE; >>> key = mon.getEventDataMap() != null ? (String) >>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use >>> } >>> mon.setKeyName(keyName); >>> mon.setKeyValue(key); >>> return new Tuple6<>(mon.getDeployment(), >>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), >>> mon.getKeyValue()); >>> } >>> }); //, info) >>> } else if >>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) { >>> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", >>> "gameId", "eventName", "component", "instance", "container"); //<== this is >>> also a Tuple6 but no complaints ? >>> } >>> } >> >> >> >> This example below needs monitoringTupleKeyedStream to be >> KeyedStream<Monitoring, Tuple6<String, String, String, String, String, >> String>> >> >>> TypeInformation<Tuple6<String, String, String, String, String, String>> >>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, >>> String, String, String>>(){}); >>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>> KeySelector<Monitoring, Tuple6<String, String, String, String, String, >>> String>>() { >>> @Override >>> public Tuple6<String, String, String, String, >>> String, String> getKey(Monitoring mon) throws Exception { >>> String key = ""; >>> String keyName = ""; >>> //TODO: extract to a method to pull key to use >>> from a config file >>> final String eventName = mon.getEventName(); >>> if (eventName != null && >>> ((eventName.equalsIgnoreCase(INGRESS_FPS))) >>> )) { >>> keyName = PCAM_ID; >>> key = mon.getEventDataMap() != null ? >>> (String) mon.getEventDataMap().get(PCAM_ID) : ""; >>> } else if (eventName != null && >>> (eventName.equalsIgnoreCase(EGRESS_FPS))) { >>> keyName = OUT_BITRATE; >>> key = mon.getEventDataMap() != null ? >>> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key >>> to use >>> } >>> mon.setKeyName(keyName); >>> mon.setKeyValue(key); >>> return new Tuple6<>(mon.getDeployment(), >>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), >>> mon.getKeyValue()); >>> } >>> }, info); >> >> >> TIA >> >