Could this just be solved by creating a POJO model class for your problem? That is, instead of using Tuple6 - create a class that encapsulates your data. This, I think, would solve your problem. But beyond that I think the code will be more understandable. It's hard to have a Tuple6 of all Strings, and remember what each one means -- even if I wrote the code :-) Furthermore, if and when you need to add more elements to your data model, you will need to refactor your entire Flink graph. Keeping a data model in POJO protects against those things.
The latter is just unsolicited code review feedback. And I know I gave it without much context to your problem. So please take with a large grain of salt, and if it doesn't apply just ignore it. Tim On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler <ches...@apache.org> wrote: > > I tried using [ keyBy(KeySelector, TypeInformation) ] > > What was the result of this approach? > > On 03/04/2019 17:36, Vijay Balakrishnan wrote: > > Hi Tim, > Thanks for your reply. I am not seeing an option to specify a .returns(new > TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with > KeyedStream ?? > >> monitoringTupleKeyedStream = kinesisStream.keyBy(new >> KeySelector<Monitoring, Tuple>() { >> public Tuple getKey(Monitoring mon) throws Exception >> {......return >> new Tuple6<>(..} }) > > I tried using > TypeInformation<Tuple6<String, String, String, String, String, String>> > info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, > String, String, String>>(){}); > >> kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info); >> //specify typeInfo through >> > > TIA, > Vijay > > On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <vict...@gmail.com> wrote: > >> Flink needs type information for serializing and deserializing objects, >> and that is lost due to Java type erasure. The only way to workaround >> this is to specify the return type of the function called in the lambda. >> >> Fabian's answer here explains it well. >> >> >> https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554 >> >> Tim >> >> On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <bvija...@gmail.com> >> wrote: >> >>> Hi, >>> I am trying to use the KeyedStream with Tuple to handle diffrent types >>> of Tuples including Tuple6. >>> Keep getting the Exception: >>> *Exception in thread "main" >>> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class >>> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, >>> Tuple2, etc.) instead*. >>> Is there a way around Type Erasure here ? >>> I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to >>> treat Tuple6 as a Tuple like the monitoringTupleKeyedStream. >>> >>> Code below: >>> >>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null; >>>> String keyOperationType = ....;//provided >>>> if (StringUtils.isNotEmpty(keyOperationType)) { >>>> if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) { >>>> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", >>>> "gameId", "eventName", "component"); >>>> } else if >>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) { >>>> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", >>>> "gameId", "eventName", "component", "instance"); >>>> } else if >>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) { >>>> TypeInformation<Tuple6<String, String, String, String, String, >>>> String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, >>>> String, String, String, String>>(){}); >>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>>> KeySelector<Monitoring, Tuple>() { >>>> public Tuple getKey(Monitoring mon) throws Exception { >>>> String key = ""; >>>> String keyName = ""; >>>> final String eventName = mon.getEventName(); >>>> if (eventName != null && >>>> ((eventName.equalsIgnoreCase(INGRESS_FPS))) >>>> )) { >>>> keyName = PCAM_ID; >>>> key = mon.getEventDataMap() != null ? (String) >>>> mon.getEventDataMap().get(PCAM_ID) : ""; >>>> } else if (eventName != null && >>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) { >>>> keyName = OUT_BITRATE; >>>> key = mon.getEventDataMap() != null ? (String) >>>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use >>>> } >>>> mon.setKeyName(keyName); >>>> mon.setKeyValue(key); >>>> return new Tuple6<>(mon.getDeployment(), >>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), >>>> mon.getKeyValue()); >>>> } >>>> }); //, info) >>>> } else if >>>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) { >>>> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", >>>> "gameId", "eventName", "component", "instance", "container"); //<== this is >>>> also a Tuple6 but no complaints ? >>>> } >>>> } >>> >>> >>> >>> This example below needs monitoringTupleKeyedStream to be >>> KeyedStream<Monitoring, Tuple6<String, String, String, String, String, >>> String>> >>> >>>> TypeInformation<Tuple6<String, String, String, String, String, String>> >>>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, >>>> String, String, String>>(){}); >>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>>> KeySelector<Monitoring, Tuple6<String, String, String, String, String, >>>> String>>() { >>>> @Override >>>> public Tuple6<String, String, String, String, >>>> String, String> getKey(Monitoring mon) throws Exception { >>>> String key = ""; >>>> String keyName = ""; >>>> //TODO: extract to a method to pull key to use >>>> from a config file >>>> final String eventName = mon.getEventName(); >>>> if (eventName != null && >>>> ((eventName.equalsIgnoreCase(INGRESS_FPS))) >>>> )) { >>>> keyName = PCAM_ID; >>>> key = mon.getEventDataMap() != null ? >>>> (String) mon.getEventDataMap().get(PCAM_ID) : ""; >>>> } else if (eventName != null && >>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) { >>>> keyName = OUT_BITRATE; >>>> key = mon.getEventDataMap() != null ? >>>> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key >>>> to use >>>> } >>>> mon.setKeyName(keyName); >>>> mon.setKeyValue(key); >>>> return new Tuple6<>(mon.getDeployment(), >>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), >>>> mon.getKeyValue()); >>>> } >>>> }, info); >>> >>> >>> TIA >>> >> >