> I tried using  [ keyBy(KeySelector, TypeInformation) ]

What was the result of this approach?

On 03/04/2019 17:36, Vijay Balakrishnan wrote:
Hi Tim,
Thanks for your reply. I am not seeing an option to specify a .returns(new TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with KeyedStream ??

    monitoringTupleKeyedStream = kinesisStream.keyBy(new
    KeySelector<Monitoring, Tuple>() {
                public Tuple getKey(Monitoring mon) throws Exception
    {......return new Tuple6<>(..}   })

I tried using
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});

    kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...},
    info); //specify typeInfo through


TIA,
Vijay

On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <vict...@gmail.com <mailto:vict...@gmail.com>> wrote:

    Flink needs type information for serializing and deserializing
    objects, and that is lost due to Java type erasure.   The only way
    to workaround this is to specify the return type of the function
    called in the lambda.

    Fabian's answer here explains it well.

    
https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554

    Tim

    On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan
    <bvija...@gmail.com <mailto:bvija...@gmail.com>> wrote:

        Hi,
        I am trying to use the KeyedStream with Tuple to handle
        diffrent types of Tuples including Tuple6.
        Keep getting the Exception:
        *Exception in thread "main"
        org.apache.flink.api.common.functions.InvalidTypesException:
        Usage of class Tuple as a type is not allowed. Use a concrete
        subclass (e.g. Tuple1, Tuple2, etc.) instead*.
        Is there a way around Type Erasure here ?
        I want to use KeyedStream<Monitoring, Tuple> so that I can
        pass it on to treat Tuple6 as a Tuple like the
        monitoringTupleKeyedStream.

        Code below:

            KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream
            = null;
            String keyOperationType = ....;//provided
            if (StringUtils.isNotEmpty(keyOperationType)) {
                if
            (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION))
            {
                    monitoringTupleKeyedStream =
            kinesisStream.keyBy("deployment", "gameId", "eventName",
            "component");
                } else if
            
(keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION))
            {
                    monitoringTupleKeyedStream =
            kinesisStream.keyBy("deployment", "gameId", "eventName",
            "component", "instance");
                } else if
            (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION))
            {
                    TypeInformation<Tuple6<String, String, String,
            String, String, String>> info = TypeInformation.of(new
            TypeHint<Tuple6<String, String, String, String, String,
            String>>(){});
                    monitoringTupleKeyedStream =
            kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
                        public Tuple getKey(Monitoring mon) throws
            Exception {
                            String key = "";
                            String keyName = "";
                            final String eventName = mon.getEventName();
                            if (eventName != null &&
            ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                            )) {
                                keyName = PCAM_ID;
                                key = mon.getEventDataMap() != null ?
            (String) mon.getEventDataMap().get(PCAM_ID) : "";
                            } else if (eventName != null &&
            (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                                keyName = OUT_BITRATE;
                                key = mon.getEventDataMap() != null ?
            (String) mon.getEventDataMap().get(OUT_BITRATE) : "";
            //TODO: identify key to use
                            }
                            mon.setKeyName(keyName);
                            mon.setKeyValue(key);
                            return new Tuple6<>(mon.getDeployment(),
            mon.getGameId(), eventName, mon.getComponent(),
            mon.getKeyName(), mon.getKeyValue());
                        }
                    }); //, info)
                } else if
            (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION))
            {
                    monitoringTupleKeyedStream =
            kinesisStream.keyBy("deployment", "gameId", "eventName",
            "component", "instance", "container"); //<== this is also
            a Tuple6 but no complaints ?
                }
            }



        This example below needs monitoringTupleKeyedStream  to be
        KeyedStream<Monitoring, Tuple6<String, String, String, String,
        String, String>>

            TypeInformation<Tuple6<String, String, String, String,
            String, String>> info = TypeInformation.of(new
            TypeHint<Tuple6<String, String, String, String, String,
            String>>(){});
            monitoringTupleKeyedStream = kinesisStream.keyBy(new
            KeySelector<Monitoring, Tuple6<String, String, String,
            String, String, String>>() {
                                @Override
                                public Tuple6<String, String, String,
            String, String, String> getKey(Monitoring mon) throws
            Exception {
                                    String key = "";
                                    String keyName = "";
                                    //TODO: extract to a method to
            pull key to use from a config file
                                    final String eventName =
            mon.getEventName();
                                    if (eventName != null &&
            ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                                    )) {
                                        keyName = PCAM_ID;
                                        key = mon.getEventDataMap() !=
            null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                                    } else if (eventName != null &&
            (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                                        keyName = OUT_BITRATE;
                                        key = mon.getEventDataMap() !=
            null ? (String) mon.getEventDataMap().get(OUT_BITRATE) :
            ""; //TODO: identify key to use
                                    }
                                    mon.setKeyName(keyName);
                                    mon.setKeyValue(key);
                                    return new
            Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName,
            mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
                                }
}, info);

        TIA


Reply via email to