> I tried using  [ keyBy(KeySelector, TypeInformation) ]

What was the result of this approach?

On 03/04/2019 17:36, Vijay Balakrishnan wrote:
Hi Tim,
Thanks for your reply. I am not seeing an option to specify a .returns(new TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with KeyedStream ??

    monitoringTupleKeyedStream = kinesisStream.keyBy(new
    KeySelector<Monitoring, Tuple>() {
                public Tuple getKey(Monitoring mon) throws Exception
    {......return new Tuple6<>(..}   })

I tried using
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});

    kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...},
    info); //specify typeInfo through


On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <vict...@gmail.com <mailto:vict...@gmail.com>> wrote:

    Flink needs type information for serializing and deserializing
    objects, and that is lost due to Java type erasure.   The only way
    to workaround this is to specify the return type of the function
    called in the lambda.

    Fabian's answer here explains it well.



    On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan
    <bvija...@gmail.com <mailto:bvija...@gmail.com>> wrote:

        I am trying to use the KeyedStream with Tuple to handle
        diffrent types of Tuples including Tuple6.
        Keep getting the Exception:
        *Exception in thread "main"
        Usage of class Tuple as a type is not allowed. Use a concrete
        subclass (e.g. Tuple1, Tuple2, etc.) instead*.
        Is there a way around Type Erasure here ?
        I want to use KeyedStream<Monitoring, Tuple> so that I can
        pass it on to treat Tuple6 as a Tuple like the

        Code below:

            KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream
            = null;
            String keyOperationType = ....;//provided
            if (StringUtils.isNotEmpty(keyOperationType)) {
                    monitoringTupleKeyedStream =
            kinesisStream.keyBy("deployment", "gameId", "eventName",
                } else if
                    monitoringTupleKeyedStream =
            kinesisStream.keyBy("deployment", "gameId", "eventName",
            "component", "instance");
                } else if
                    TypeInformation<Tuple6<String, String, String,
            String, String, String>> info = TypeInformation.of(new
            TypeHint<Tuple6<String, String, String, String, String,
                    monitoringTupleKeyedStream =
            kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
                        public Tuple getKey(Monitoring mon) throws
            Exception {
                            String key = "";
                            String keyName = "";
                            final String eventName = mon.getEventName();
                            if (eventName != null &&
                            )) {
                                keyName = PCAM_ID;
                                key = mon.getEventDataMap() != null ?
            (String) mon.getEventDataMap().get(PCAM_ID) : "";
                            } else if (eventName != null &&
            (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                                keyName = OUT_BITRATE;
                                key = mon.getEventDataMap() != null ?
            (String) mon.getEventDataMap().get(OUT_BITRATE) : "";
            //TODO: identify key to use
                            return new Tuple6<>(mon.getDeployment(),
            mon.getGameId(), eventName, mon.getComponent(),
            mon.getKeyName(), mon.getKeyValue());
                    }); //, info)
                } else if
                    monitoringTupleKeyedStream =
            kinesisStream.keyBy("deployment", "gameId", "eventName",
            "component", "instance", "container"); //<== this is also
            a Tuple6 but no complaints ?

        This example below needs monitoringTupleKeyedStream  to be
        KeyedStream<Monitoring, Tuple6<String, String, String, String,
        String, String>>

            TypeInformation<Tuple6<String, String, String, String,
            String, String>> info = TypeInformation.of(new
            TypeHint<Tuple6<String, String, String, String, String,
            monitoringTupleKeyedStream = kinesisStream.keyBy(new
            KeySelector<Monitoring, Tuple6<String, String, String,
            String, String, String>>() {
                                public Tuple6<String, String, String,
            String, String, String> getKey(Monitoring mon) throws
            Exception {
                                    String key = "";
                                    String keyName = "";
                                    //TODO: extract to a method to
            pull key to use from a config file
                                    final String eventName =
                                    if (eventName != null &&
                                    )) {
                                        keyName = PCAM_ID;
                                        key = mon.getEventDataMap() !=
            null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                                    } else if (eventName != null &&
            (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                                        keyName = OUT_BITRATE;
                                        key = mon.getEventDataMap() !=
            null ? (String) mon.getEventDataMap().get(OUT_BITRATE) :
            ""; //TODO: identify key to use
                                    return new
            Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName,
            mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
}, info);


Reply via email to