Hi Chesnay, Sorry for causing the confusion. I solved the problem by following another person's recommendation on the other post about using a wrapper POJO. So, I used a wrapper MonitoringTuple to wrap the Tuple and that solved my problem with varying number of fields in the Tuple interface.
public class MonitoringTuple { > private Tuple tuple; > > Then, I used it like this: > KeyedStream<Map<String, Object>, MonitoringTuple> > monitoringTupleKeyedStream = kinesisStream.keyBy(new > MapTupleKeySelector(groupBySet)); The MapTupleKeySelector is defined below: > public static class MapTupleKeySelector implements KeySelector<Map<String, > Object>, MonitoringTuple> { private final Set<String> groupBySet; > public MapTupleKeySelector(Set<String> groupBySet) { this.groupBySet = groupBySet; } > @Override public MonitoringTuple getKey(Map<String, Object> inputMap) { int groupBySetSize = groupBySet.size(); Tuple tuple = Tuple.newInstance(groupBySetSize); int count = 0; for (String groupBy : groupBySet) { count = setTupleField(inputMap, tuple, count, groupBy); } return new MonitoringTuple(tuple); } } > public static int setTupleField(Map<String, Object> inputMap, Tuple > tuple, int count, String groupBy) { Object groupByValueObj = inputMap.get(groupBy); String tupleValue = Utils.convertToString(groupByValueObj); tuple.setField(tupleValue, count++); return count; } } TIA, On Thu, May 2, 2019 at 4:54 AM Chesnay Schepler <ches...@apache.org> wrote: > I'm not sure what you're asking. > > If you have a Deserialization schema that convert the data into a Map > you're done as I understand it, what do you believe to be missing? > > If, for a given job, the number/types of fields are fixed you could look > into using Row. > > On 01/05/2019 22:40, Vijay Balakrishnan wrote: > > Hi, > Had asked this questions earlier as topic - "Flink - Type Erasure > Exception trying to use Tuple6 instead of Tuple" > > Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2 > etc. > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: Usage of class > Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, > Tuple2, etc.) instead. > > DataStream<Map<String, Object>> kinesisStream = ...; > KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedStream = > kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<===== complains > about Tuple type for monitoringTupleKeyedStream > ..... > > public static class MapTupleKeySelector implements KeySelector<Map<String, > Object>, Tuple> { > private final Set<String> groupBySet; > > public MapTupleKeySelector(Set<String> groupBySet) { > this.groupBySet = groupBySet; > } > > @Override > public Tuple getKey(Map<String, Object> inputMap) throws Exception > { > int groupBySetSize = groupBySet.size(); > Tuple tuple = Tuple.newInstance(groupBySetSize); > //Tuple1 tuple = new Tuple1(); > int count = 0; > for (String groupBy : groupBySet) { > tuple.setField(groupByValue, count++); > } > return tuple; > } > } > > Abhishek had replied back in the Thread as follows: (posting in that > thread as well creating a new thread): > However, If you are trying to build some generic framework and for > different streams, there would be different fields, you can follow the Map > approach. For the latter approach, you need to write extra mapper class > which will convert all the fields in the stream to the Map based stream. > > Can I get an example of how to create this extra Mapper class ? > > Currently, I am using deserialization to convert the incoming byte[] by > implementing KinesisDeserializationSchema<Map<String, Object>> to convert > to a DataStream<Map<String, Object>> kinesisStream. > > TIA, > > >