I solved the problem by following another person's recommendation on the
other post about using a wrapper POJO.
So, I used a wrapper MonitoringTuple to wrap the Tuple and that solved my
problem with varying number of fields in the Tuple interface.

public class MonitoringTuple {
>     private Tuple tuple;
>
>
Then, I used it like this:

> KeyedStream<Map<String, Object>, MonitoringTuple>
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> MapTupleKeySelector(groupBySet));


The MapTupleKeySelector is defined below:

> public static class MapTupleKeySelector implements KeySelector<Map<String,
> Object>, MonitoringTuple> {

        private final Set<String> groupBySet;


>         public MapTupleKeySelector(Set<String> groupBySet) {

            this.groupBySet = groupBySet;

        }


>         @Override

        public MonitoringTuple getKey(Map<String, Object> inputMap) {

            int groupBySetSize = groupBySet.size();

            Tuple tuple = Tuple.newInstance(groupBySetSize);

            int count = 0;

            for (String groupBy : groupBySet) {

                    count = setTupleField(inputMap, tuple, count, groupBy);

            }

            return new MonitoringTuple(tuple);

        }

    }


>     public static int setTupleField(Map<String, Object> inputMap, Tuple
> tuple, int count, String groupBy) {

        Object groupByValueObj = inputMap.get(groupBy);

        String tupleValue = Utils.convertToString(groupByValueObj);

        tuple.setField(tupleValue, count++);

        return count;

    }

}





TIA,

On Wed, May 1, 2019 at 1:39 PM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi,
> Had asked this questions earlier as topic - "Flink - Type Erasure
> Exception trying to use Tuple6 instead of Tuple"
>
> Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2
> etc.
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
> Tuple2, etc.) instead.
>
> DataStream<Map<String, Object>> kinesisStream = ...;
> KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedStream =
> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<===== complains
> about Tuple type for monitoringTupleKeyedStream
> .....
>
> public static class MapTupleKeySelector implements KeySelector<Map<String,
> Object>, Tuple> {
>         private final Set<String> groupBySet;
>
>         public MapTupleKeySelector(Set<String> groupBySet) {
>             this.groupBySet = groupBySet;
>         }
>
>         @Override
>         public Tuple getKey(Map<String, Object> inputMap) throws Exception
> {
>             int groupBySetSize = groupBySet.size();
>             Tuple tuple = Tuple.newInstance(groupBySetSize);
>             //Tuple1 tuple = new Tuple1();
>             int count = 0;
>             for (String groupBy : groupBySet) {
> tuple.setField(groupByValue, count++);
>             }
>             return tuple;
>         }
>     }
>
> Abhishek had replied back in the Thread as follows: (posting in that
> thread as well creating a new thread):
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Can I get an example of how to create this extra Mapper class ?
>
> Currently, I am using deserialization to convert the incoming byte[] by
> implementing KinesisDeserializationSchema<Map<String, Object>> to convert
> to a DataStream<Map<String, Object>> kinesisStream.
>
> TIA,
>
> On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma <abhioncbr.apa...@gmail.com>
> wrote:
>
>> I agree with Timothy, POJO would be a much better approach.
>>
>> However, If you are trying to build some generic framework and for
>> different streams, there would be different fields, you can follow the Map
>> approach. For the latter approach, you need to write extra mapper class
>> which will convert all the fields in the stream to the Map based stream.
>>
>> Abhishek
>>
>> On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor <vict...@gmail.com> wrote:
>>
>>> Could this just be solved by creating a POJO model class for your
>>> problem?
>>>
>>> That is, instead of using Tuple6 - create a class that encapsulates your
>>> data.   This, I think, would solve your problem.  But beyond that I think
>>> the code will be more understandable.  It's hard to have a Tuple6 of all
>>> Strings, and remember what each one means -- even if I wrote the code :-)
>>> Furthermore, if and when you need to add more elements to your data model,
>>> you will need to refactor your entire Flink graph.   Keeping a data model
>>> in POJO protects against those things.
>>>
>>> The latter is just unsolicited code review feedback.   And I know I gave
>>> it without much context to your problem.  So please take with a large grain
>>> of salt, and if it doesn't apply just ignore it.
>>>
>>> Tim
>>>
>>>
>>> On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler <ches...@apache.org>
>>> wrote:
>>>
>>>> > I tried using  [ keyBy(KeySelector, TypeInformation) ]
>>>>
>>>> What was the result of this approach?
>>>>
>>>> On 03/04/2019 17:36, Vijay Balakrishnan wrote:
>>>>
>>>> Hi Tim,
>>>> Thanks for your reply. I am not seeing an option to specify a
>>>> .returns(new TypeHint<Tuple6<String,
>>>> String,String,String,String,String>>(){}) with KeyedStream ??
>>>>
>>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>>> KeySelector<Monitoring, Tuple>() {
>>>>>             public Tuple getKey(Monitoring mon) throws Exception
>>>>> {......return new Tuple6<>(..}    })
>>>>
>>>> I tried using
>>>> TypeInformation<Tuple6<String, String, String, String, String, String>>
>>>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String,
>>>> String, String, String>>(){});
>>>>
>>>>> kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info);
>>>>> //specify typeInfo through
>>>>>
>>>>
>>>> TIA,
>>>> Vijay
>>>>
>>>> On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <vict...@gmail.com>
>>>> wrote:
>>>>
>>>>> Flink needs type information for serializing and deserializing
>>>>> objects, and that is lost due to Java type erasure.   The only way to
>>>>> workaround this is to specify the return type of the function called in 
>>>>> the
>>>>> lambda.
>>>>>
>>>>> Fabian's answer here explains it well.
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554
>>>>>
>>>>> Tim
>>>>>
>>>>> On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <bvija...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am trying to use the KeyedStream with Tuple to handle diffrent
>>>>>> types of Tuples including Tuple6.
>>>>>> Keep getting the Exception:
>>>>>> *Exception in thread "main"
>>>>>> org.apache.flink.api.common.functions.InvalidTypesException: Usage of 
>>>>>> class
>>>>>> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
>>>>>> Tuple2, etc.) instead*.
>>>>>> Is there a way around Type Erasure here ?
>>>>>> I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on
>>>>>> to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.
>>>>>>
>>>>>> Code below:
>>>>>>
>>>>>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
>>>>>>> String keyOperationType = ....;//provided
>>>>>>> if (StringUtils.isNotEmpty(keyOperationType)) {
>>>>>>>     if
>>>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
>>>>>>>         monitoringTupleKeyedStream =
>>>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
>>>>>>>     } else if
>>>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) 
>>>>>>> {
>>>>>>>         monitoringTupleKeyedStream =
>>>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component",
>>>>>>> "instance");
>>>>>>>     } else if
>>>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
>>>>>>>         TypeInformation<Tuple6<String, String, String, String,
>>>>>>> String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String,
>>>>>>> String, String, String, String, String>>(){});
>>>>>>>         monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>>>>> KeySelector<Monitoring, Tuple>() {
>>>>>>>             public Tuple getKey(Monitoring mon) throws Exception {
>>>>>>>                 String key = "";
>>>>>>>                 String keyName = "";
>>>>>>>                 final String eventName = mon.getEventName();
>>>>>>>                 if (eventName != null &&
>>>>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>>>>>>                 )) {
>>>>>>>                     keyName = PCAM_ID;
>>>>>>>                     key = mon.getEventDataMap() != null ? (String)
>>>>>>> mon.getEventDataMap().get(PCAM_ID) : "";
>>>>>>>                 } else if (eventName != null &&
>>>>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>>>>>>                     keyName = OUT_BITRATE;
>>>>>>>                     key = mon.getEventDataMap() != null ? (String)
>>>>>>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
>>>>>>>                 }
>>>>>>>                 mon.setKeyName(keyName);
>>>>>>>                 mon.setKeyValue(key);
>>>>>>>                 return new Tuple6<>(mon.getDeployment(),
>>>>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>>>>>>> mon.getKeyValue());
>>>>>>>             }
>>>>>>>         }); //, info)
>>>>>>>     } else if
>>>>>>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
>>>>>>>         monitoringTupleKeyedStream =
>>>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component",
>>>>>>> "instance", "container"); //<== this is also a Tuple6 but no complaints 
>>>>>>> ?
>>>>>>>     }
>>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> This example below needs monitoringTupleKeyedStream  to be
>>>>>> KeyedStream<Monitoring, Tuple6<String, String, String, String, String,
>>>>>> String>>
>>>>>>
>>>>>>> TypeInformation<Tuple6<String, String, String, String, String,
>>>>>>> String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String,
>>>>>>> String, String, String, String>>(){});
>>>>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>>>>> KeySelector<Monitoring, Tuple6<String, String, String, String, String,
>>>>>>> String>>() {
>>>>>>>                     @Override
>>>>>>>                     public Tuple6<String, String, String, String,
>>>>>>> String, String> getKey(Monitoring mon) throws Exception {
>>>>>>>                         String key = "";
>>>>>>>                         String keyName = "";
>>>>>>>                         //TODO: extract to a method to pull key to
>>>>>>> use from a config file
>>>>>>>                         final String eventName = mon.getEventName();
>>>>>>>                         if (eventName != null &&
>>>>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>>>>>>                         )) {
>>>>>>>                             keyName = PCAM_ID;
>>>>>>>                             key = mon.getEventDataMap() != null ?
>>>>>>> (String) mon.getEventDataMap().get(PCAM_ID) : "";
>>>>>>>                         } else if (eventName != null &&
>>>>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>>>>>>                             keyName = OUT_BITRATE;
>>>>>>>                             key = mon.getEventDataMap() != null ?
>>>>>>> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify 
>>>>>>> key
>>>>>>> to use
>>>>>>>                         }
>>>>>>>                         mon.setKeyName(keyName);
>>>>>>>                         mon.setKeyValue(key);
>>>>>>>                         return new Tuple6<>(mon.getDeployment(),
>>>>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>>>>>>> mon.getKeyValue());
>>>>>>>                     }
>>>>>>>                 }, info);
>>>>>>
>>>>>>
>>>>>> TIA
>>>>>>
>>>>>
>>>>

Reply via email to