I solved the problem by following another person's recommendation on the other post about using a wrapper POJO. So, I used a wrapper MonitoringTuple to wrap the Tuple and that solved my problem with varying number of fields in the Tuple interface.
public class MonitoringTuple { > private Tuple tuple; > > Then, I used it like this: > KeyedStream<Map<String, Object>, MonitoringTuple> > monitoringTupleKeyedStream = kinesisStream.keyBy(new > MapTupleKeySelector(groupBySet)); The MapTupleKeySelector is defined below: > public static class MapTupleKeySelector implements KeySelector<Map<String, > Object>, MonitoringTuple> { private final Set<String> groupBySet; > public MapTupleKeySelector(Set<String> groupBySet) { this.groupBySet = groupBySet; } > @Override public MonitoringTuple getKey(Map<String, Object> inputMap) { int groupBySetSize = groupBySet.size(); Tuple tuple = Tuple.newInstance(groupBySetSize); int count = 0; for (String groupBy : groupBySet) { count = setTupleField(inputMap, tuple, count, groupBy); } return new MonitoringTuple(tuple); } } > public static int setTupleField(Map<String, Object> inputMap, Tuple > tuple, int count, String groupBy) { Object groupByValueObj = inputMap.get(groupBy); String tupleValue = Utils.convertToString(groupByValueObj); tuple.setField(tupleValue, count++); return count; } } TIA, On Wed, May 1, 2019 at 1:39 PM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi, > Had asked this questions earlier as topic - "Flink - Type Erasure > Exception trying to use Tuple6 instead of Tuple" > > Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2 > etc. > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: Usage of class > Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, > Tuple2, etc.) instead. > > DataStream<Map<String, Object>> kinesisStream = ...; > KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedStream = > kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<===== complains > about Tuple type for monitoringTupleKeyedStream > ..... > > public static class MapTupleKeySelector implements KeySelector<Map<String, > Object>, Tuple> { > private final Set<String> groupBySet; > > public MapTupleKeySelector(Set<String> groupBySet) { > this.groupBySet = groupBySet; > } > > @Override > public Tuple getKey(Map<String, Object> inputMap) throws Exception > { > int groupBySetSize = groupBySet.size(); > Tuple tuple = Tuple.newInstance(groupBySetSize); > //Tuple1 tuple = new Tuple1(); > int count = 0; > for (String groupBy : groupBySet) { > tuple.setField(groupByValue, count++); > } > return tuple; > } > } > > Abhishek had replied back in the Thread as follows: (posting in that > thread as well creating a new thread): > However, If you are trying to build some generic framework and for > different streams, there would be different fields, you can follow the Map > approach. For the latter approach, you need to write extra mapper class > which will convert all the fields in the stream to the Map based stream. > > Can I get an example of how to create this extra Mapper class ? > > Currently, I am using deserialization to convert the incoming byte[] by > implementing KinesisDeserializationSchema<Map<String, Object>> to convert > to a DataStream<Map<String, Object>> kinesisStream. > > TIA, > > On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma <abhioncbr.apa...@gmail.com> > wrote: > >> I agree with Timothy, POJO would be a much better approach. >> >> However, If you are trying to build some generic framework and for >> different streams, there would be different fields, you can follow the Map >> approach. For the latter approach, you need to write extra mapper class >> which will convert all the fields in the stream to the Map based stream. >> >> Abhishek >> >> On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor <vict...@gmail.com> wrote: >> >>> Could this just be solved by creating a POJO model class for your >>> problem? >>> >>> That is, instead of using Tuple6 - create a class that encapsulates your >>> data. This, I think, would solve your problem. But beyond that I think >>> the code will be more understandable. It's hard to have a Tuple6 of all >>> Strings, and remember what each one means -- even if I wrote the code :-) >>> Furthermore, if and when you need to add more elements to your data model, >>> you will need to refactor your entire Flink graph. Keeping a data model >>> in POJO protects against those things. >>> >>> The latter is just unsolicited code review feedback. And I know I gave >>> it without much context to your problem. So please take with a large grain >>> of salt, and if it doesn't apply just ignore it. >>> >>> Tim >>> >>> >>> On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler <ches...@apache.org> >>> wrote: >>> >>>> > I tried using [ keyBy(KeySelector, TypeInformation) ] >>>> >>>> What was the result of this approach? >>>> >>>> On 03/04/2019 17:36, Vijay Balakrishnan wrote: >>>> >>>> Hi Tim, >>>> Thanks for your reply. I am not seeing an option to specify a >>>> .returns(new TypeHint<Tuple6<String, >>>> String,String,String,String,String>>(){}) with KeyedStream ?? >>>> >>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>>>> KeySelector<Monitoring, Tuple>() { >>>>> public Tuple getKey(Monitoring mon) throws Exception >>>>> {......return new Tuple6<>(..} }) >>>> >>>> I tried using >>>> TypeInformation<Tuple6<String, String, String, String, String, String>> >>>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, >>>> String, String, String>>(){}); >>>> >>>>> kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info); >>>>> //specify typeInfo through >>>>> >>>> >>>> TIA, >>>> Vijay >>>> >>>> On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <vict...@gmail.com> >>>> wrote: >>>> >>>>> Flink needs type information for serializing and deserializing >>>>> objects, and that is lost due to Java type erasure. The only way to >>>>> workaround this is to specify the return type of the function called in >>>>> the >>>>> lambda. >>>>> >>>>> Fabian's answer here explains it well. >>>>> >>>>> >>>>> https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554 >>>>> >>>>> Tim >>>>> >>>>> On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <bvija...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> I am trying to use the KeyedStream with Tuple to handle diffrent >>>>>> types of Tuples including Tuple6. >>>>>> Keep getting the Exception: >>>>>> *Exception in thread "main" >>>>>> org.apache.flink.api.common.functions.InvalidTypesException: Usage of >>>>>> class >>>>>> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, >>>>>> Tuple2, etc.) instead*. >>>>>> Is there a way around Type Erasure here ? >>>>>> I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on >>>>>> to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream. >>>>>> >>>>>> Code below: >>>>>> >>>>>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null; >>>>>>> String keyOperationType = ....;//provided >>>>>>> if (StringUtils.isNotEmpty(keyOperationType)) { >>>>>>> if >>>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) { >>>>>>> monitoringTupleKeyedStream = >>>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component"); >>>>>>> } else if >>>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) >>>>>>> { >>>>>>> monitoringTupleKeyedStream = >>>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component", >>>>>>> "instance"); >>>>>>> } else if >>>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) { >>>>>>> TypeInformation<Tuple6<String, String, String, String, >>>>>>> String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, >>>>>>> String, String, String, String, String>>(){}); >>>>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>>>>>> KeySelector<Monitoring, Tuple>() { >>>>>>> public Tuple getKey(Monitoring mon) throws Exception { >>>>>>> String key = ""; >>>>>>> String keyName = ""; >>>>>>> final String eventName = mon.getEventName(); >>>>>>> if (eventName != null && >>>>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS))) >>>>>>> )) { >>>>>>> keyName = PCAM_ID; >>>>>>> key = mon.getEventDataMap() != null ? (String) >>>>>>> mon.getEventDataMap().get(PCAM_ID) : ""; >>>>>>> } else if (eventName != null && >>>>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) { >>>>>>> keyName = OUT_BITRATE; >>>>>>> key = mon.getEventDataMap() != null ? (String) >>>>>>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use >>>>>>> } >>>>>>> mon.setKeyName(keyName); >>>>>>> mon.setKeyValue(key); >>>>>>> return new Tuple6<>(mon.getDeployment(), >>>>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), >>>>>>> mon.getKeyValue()); >>>>>>> } >>>>>>> }); //, info) >>>>>>> } else if >>>>>>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) { >>>>>>> monitoringTupleKeyedStream = >>>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component", >>>>>>> "instance", "container"); //<== this is also a Tuple6 but no complaints >>>>>>> ? >>>>>>> } >>>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> This example below needs monitoringTupleKeyedStream to be >>>>>> KeyedStream<Monitoring, Tuple6<String, String, String, String, String, >>>>>> String>> >>>>>> >>>>>>> TypeInformation<Tuple6<String, String, String, String, String, >>>>>>> String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, >>>>>>> String, String, String, String>>(){}); >>>>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>>>>>> KeySelector<Monitoring, Tuple6<String, String, String, String, String, >>>>>>> String>>() { >>>>>>> @Override >>>>>>> public Tuple6<String, String, String, String, >>>>>>> String, String> getKey(Monitoring mon) throws Exception { >>>>>>> String key = ""; >>>>>>> String keyName = ""; >>>>>>> //TODO: extract to a method to pull key to >>>>>>> use from a config file >>>>>>> final String eventName = mon.getEventName(); >>>>>>> if (eventName != null && >>>>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS))) >>>>>>> )) { >>>>>>> keyName = PCAM_ID; >>>>>>> key = mon.getEventDataMap() != null ? >>>>>>> (String) mon.getEventDataMap().get(PCAM_ID) : ""; >>>>>>> } else if (eventName != null && >>>>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) { >>>>>>> keyName = OUT_BITRATE; >>>>>>> key = mon.getEventDataMap() != null ? >>>>>>> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify >>>>>>> key >>>>>>> to use >>>>>>> } >>>>>>> mon.setKeyName(keyName); >>>>>>> mon.setKeyValue(key); >>>>>>> return new Tuple6<>(mon.getDeployment(), >>>>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), >>>>>>> mon.getKeyValue()); >>>>>>> } >>>>>>> }, info); >>>>>> >>>>>> >>>>>> TIA >>>>>> >>>>> >>>>