Thx for all your replies. Solved the problem by skirting the issue. I pre-populated the incoming Monitoring Object on intake with the dynamic runtime fields keyName and keyValue and that way, I could use the static call as used in all the other if conditions: monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "keyName","keyValue");
The reason, I want to use Tuple was because I was passing this KeyedStream<Monitoring, Tuple> to a common method that could handle the Tuple accordingly. I tried using [ keyBy(KeySelector, TypeInformation) ] but the compiler complained that I need to use Monitoring, Tuple6 in that particular case. Vijay On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma <abhioncbr.apa...@gmail.com> wrote: > I agree with Timothy, POJO would be a much better approach. > > However, If you are trying to build some generic framework and for > different streams, there would be different fields, you can follow the Map > approach. For the latter approach, you need to write extra mapper class > which will convert all the fields in the stream to the Map based stream. > > Abhishek > > On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor <vict...@gmail.com> wrote: > >> Could this just be solved by creating a POJO model class for your problem? >> >> That is, instead of using Tuple6 - create a class that encapsulates your >> data. This, I think, would solve your problem. But beyond that I think >> the code will be more understandable. It's hard to have a Tuple6 of all >> Strings, and remember what each one means -- even if I wrote the code :-) >> Furthermore, if and when you need to add more elements to your data model, >> you will need to refactor your entire Flink graph. Keeping a data model >> in POJO protects against those things. >> >> The latter is just unsolicited code review feedback. And I know I gave >> it without much context to your problem. So please take with a large grain >> of salt, and if it doesn't apply just ignore it. >> >> Tim >> >> >> On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> > I tried using [ keyBy(KeySelector, TypeInformation) ] >>> >>> What was the result of this approach? >>> >>> On 03/04/2019 17:36, Vijay Balakrishnan wrote: >>> >>> Hi Tim, >>> Thanks for your reply. I am not seeing an option to specify a >>> .returns(new TypeHint<Tuple6<String, >>> String,String,String,String,String>>(){}) with KeyedStream ?? >>> >>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>>> KeySelector<Monitoring, Tuple>() { >>>> public Tuple getKey(Monitoring mon) throws Exception >>>> {......return >>>> new Tuple6<>(..} }) >>> >>> I tried using >>> TypeInformation<Tuple6<String, String, String, String, String, String>> >>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, >>> String, String, String>>(){}); >>> >>>> kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info); >>>> //specify typeInfo through >>>> >>> >>> TIA, >>> Vijay >>> >>> On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <vict...@gmail.com> wrote: >>> >>>> Flink needs type information for serializing and deserializing objects, >>>> and that is lost due to Java type erasure. The only way to workaround >>>> this is to specify the return type of the function called in the lambda. >>>> >>>> Fabian's answer here explains it well. >>>> >>>> >>>> https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554 >>>> >>>> Tim >>>> >>>> On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <bvija...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> I am trying to use the KeyedStream with Tuple to handle diffrent types >>>>> of Tuples including Tuple6. >>>>> Keep getting the Exception: >>>>> *Exception in thread "main" >>>>> org.apache.flink.api.common.functions.InvalidTypesException: Usage of >>>>> class >>>>> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, >>>>> Tuple2, etc.) instead*. >>>>> Is there a way around Type Erasure here ? >>>>> I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on >>>>> to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream. >>>>> >>>>> Code below: >>>>> >>>>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null; >>>>>> String keyOperationType = ....;//provided >>>>>> if (StringUtils.isNotEmpty(keyOperationType)) { >>>>>> if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) >>>>>> { >>>>>> monitoringTupleKeyedStream = >>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component"); >>>>>> } else if >>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) { >>>>>> monitoringTupleKeyedStream = >>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component", >>>>>> "instance"); >>>>>> } else if >>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) { >>>>>> TypeInformation<Tuple6<String, String, String, String, >>>>>> String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, >>>>>> String, String, String, String, String>>(){}); >>>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>>>>> KeySelector<Monitoring, Tuple>() { >>>>>> public Tuple getKey(Monitoring mon) throws Exception { >>>>>> String key = ""; >>>>>> String keyName = ""; >>>>>> final String eventName = mon.getEventName(); >>>>>> if (eventName != null && >>>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS))) >>>>>> )) { >>>>>> keyName = PCAM_ID; >>>>>> key = mon.getEventDataMap() != null ? (String) >>>>>> mon.getEventDataMap().get(PCAM_ID) : ""; >>>>>> } else if (eventName != null && >>>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) { >>>>>> keyName = OUT_BITRATE; >>>>>> key = mon.getEventDataMap() != null ? (String) >>>>>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use >>>>>> } >>>>>> mon.setKeyName(keyName); >>>>>> mon.setKeyValue(key); >>>>>> return new Tuple6<>(mon.getDeployment(), >>>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), >>>>>> mon.getKeyValue()); >>>>>> } >>>>>> }); //, info) >>>>>> } else if >>>>>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) { >>>>>> monitoringTupleKeyedStream = >>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component", >>>>>> "instance", "container"); //<== this is also a Tuple6 but no complaints ? >>>>>> } >>>>>> } >>>>> >>>>> >>>>> >>>>> This example below needs monitoringTupleKeyedStream to be >>>>> KeyedStream<Monitoring, Tuple6<String, String, String, String, String, >>>>> String>> >>>>> >>>>>> TypeInformation<Tuple6<String, String, String, String, String, >>>>>> String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, >>>>>> String, String, String, String>>(){}); >>>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>>>>> KeySelector<Monitoring, Tuple6<String, String, String, String, String, >>>>>> String>>() { >>>>>> @Override >>>>>> public Tuple6<String, String, String, String, >>>>>> String, String> getKey(Monitoring mon) throws Exception { >>>>>> String key = ""; >>>>>> String keyName = ""; >>>>>> //TODO: extract to a method to pull key to >>>>>> use from a config file >>>>>> final String eventName = mon.getEventName(); >>>>>> if (eventName != null && >>>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS))) >>>>>> )) { >>>>>> keyName = PCAM_ID; >>>>>> key = mon.getEventDataMap() != null ? >>>>>> (String) mon.getEventDataMap().get(PCAM_ID) : ""; >>>>>> } else if (eventName != null && >>>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) { >>>>>> keyName = OUT_BITRATE; >>>>>> key = mon.getEventDataMap() != null ? >>>>>> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify >>>>>> key >>>>>> to use >>>>>> } >>>>>> mon.setKeyName(keyName); >>>>>> mon.setKeyValue(key); >>>>>> return new Tuple6<>(mon.getDeployment(), >>>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), >>>>>> mon.getKeyValue()); >>>>>> } >>>>>> }, info); >>>>> >>>>> >>>>> TIA >>>>> >>>> >>>