Hi Chesnay,
Sorry for causing the confusion. I solved the problem by following another
person's recommendation on the other post about using a wrapper POJO.
So, I used a wrapper MonitoringTuple to wrap the Tuple and that solved my
problem with varying number of fields in the Tuple interface.

public class MonitoringTuple {
>     private Tuple tuple;
>
>
Then, I used it like this:

> KeyedStream<Map<String, Object>, MonitoringTuple>
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> MapTupleKeySelector(groupBySet));


The MapTupleKeySelector is defined below:

> public static class MapTupleKeySelector implements KeySelector<Map<String,
> Object>, MonitoringTuple> {

        private final Set<String> groupBySet;


>         public MapTupleKeySelector(Set<String> groupBySet) {

            this.groupBySet = groupBySet;

        }


>         @Override

        public MonitoringTuple getKey(Map<String, Object> inputMap) {

            int groupBySetSize = groupBySet.size();

            Tuple tuple = Tuple.newInstance(groupBySetSize);

            int count = 0;

            for (String groupBy : groupBySet) {

                    count = setTupleField(inputMap, tuple, count, groupBy);

            }

            return new MonitoringTuple(tuple);

        }

    }


>     public static int setTupleField(Map<String, Object> inputMap, Tuple
> tuple, int count, String groupBy) {

        Object groupByValueObj = inputMap.get(groupBy);

        String tupleValue = Utils.convertToString(groupByValueObj);

        tuple.setField(tupleValue, count++);

        return count;

    }

}





TIA,

On Thu, May 2, 2019 at 4:54 AM Chesnay Schepler <ches...@apache.org> wrote:

> I'm not sure what you're asking.
>
> If you have a Deserialization schema that convert the data into a Map
> you're done as I understand it, what do you believe to be missing?
>
> If, for a given job, the number/types of fields are fixed you could look
> into using Row.
>
> On 01/05/2019 22:40, Vijay Balakrishnan wrote:
>
> Hi,
> Had asked this questions earlier as topic - "Flink - Type Erasure
> Exception trying to use Tuple6 instead of Tuple"
>
> Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2
> etc.
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
> Tuple2, etc.) instead.
>
> DataStream<Map<String, Object>> kinesisStream = ...;
> KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedStream =
> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<===== complains
> about Tuple type for monitoringTupleKeyedStream
> .....
>
> public static class MapTupleKeySelector implements KeySelector<Map<String,
> Object>, Tuple> {
>         private final Set<String> groupBySet;
>
>         public MapTupleKeySelector(Set<String> groupBySet) {
>             this.groupBySet = groupBySet;
>         }
>
>         @Override
>         public Tuple getKey(Map<String, Object> inputMap) throws Exception
> {
>             int groupBySetSize = groupBySet.size();
>             Tuple tuple = Tuple.newInstance(groupBySetSize);
>             //Tuple1 tuple = new Tuple1();
>             int count = 0;
>             for (String groupBy : groupBySet) {
> tuple.setField(groupByValue, count++);
>             }
>             return tuple;
>         }
>     }
>
> Abhishek had replied back in the Thread as follows: (posting in that
> thread as well creating a new thread):
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Can I get an example of how to create this extra Mapper class ?
>
> Currently, I am using deserialization to convert the incoming byte[] by
> implementing KinesisDeserializationSchema<Map<String, Object>> to convert
> to a DataStream<Map<String, Object>> kinesisStream.
>
> TIA,
>
>
>

Reply via email to