Hi Yukun,

I think the problem of the input type inference is that SortedMap is a GenericType and not a Flink native type (like Tuple or POJO). This case is not supported at the moment. You can create an issue if you like, maybe there is a way to support this special type inference case.

Timo


On 09.07.2016 11:55, Yukun Guo wrote:
Hi Robert,

On 9 July 2016 at 00:25, Robert Metzger <rmetz...@apache.org <mailto:rmetz...@apache.org>> wrote:

    Hi Yukun,

    can you also post the code how you are invoking the
    GenericFlatMapper on the mailing list?


Here is the code defining the topology:
DataStream<String> stream = ...; stream .keyBy(new KeySelector<String, Integer>() { @Override public Integer getKey(String x) throws Exception { return x.hashCode() % 10; } }) .timeWindow(Time.seconds(10)) .fold(new TreeMap<String, Long>(), new FoldFunction<String, SortedMap<String, Long>>() { @Override public SortedMap<String, Long> fold(SortedMap<String, Long> map, String x) { Long current = map.get(x); Long updated = current != null ? current + 1 : 1; map.put(x, updated); return map; } }) .flatMap(new GenericFlatMapper<String>()) .returns(new TypeHint<Tuple2<String, Long>>(){}.getTypeInfo()) // throws InvalidTypesException if you comment out this line .print();


    The Java compiler is usually dropping the generic types during
    compilation ("type erasure"), that's why we can not infer the types.


The error message implies type extraction should be possible when "all variables in the return type can be deduced from the input type(s)". This is true for flatMap(Tuple2<Long, T>, Collector<Tuple2<T, String>>), but if the signature is changed to void flatMap(SortedMap<T, Long>, Collector<Tuple2<T, Long>>), type inference fails.


    On Fri, Jul 8, 2016 at 12:27 PM, Yukun Guo <gyk....@gmail.com
    <mailto:gyk....@gmail.com>> wrote:

        Hi,
        When I run the code implementing a generic FlatMapFunction,
        Flink complained about InvalidTypesException:

        public class GenericFlatMapper<T>implements FlatMapFunction<SortedMap<T,Long>,Tuple2<T,Long>>{ 
@Override public void flatMap(SortedMap<T,Long>m,Collector<Tuple2<T,Long>>out) throws Exception { for 
(Map.Entry<T,Long>entry :m.entrySet()) { out.collect(Tuple2.of(entry.getKey(),entry.getValue()));
                 } } }

        /Exception in thread "main"
        org.apache.flink.api.common.functions.InvalidTypesException:
        The return type of function could not be determined
        automatically, due to type erasure. You can give type
        information hints by using the returns(...) method on the
        result of the transformation call, or by letting your function
        implement the 'ResultTypeQueryable' interface.
        /
        /...
        /
        /Caused by:
        org.apache.flink.api.common.functions.InvalidTypesException:
        Type of TypeVariable 'T' in 'class GenericFlatMapper' could
        not be determined. This is most likely a type erasure problem.
        The type extraction currently supports types with generic
        variables only in cases where all variables in the return type
        can be deduced from the input type(s)./

        This puzzles me as Flink should be able to infer the type from
        arguments. I know returns(...) or other workarounds to give
        type hint, but they are kind of verbose. Any suggestions?





--
Freundliche Grüße / Kind Regards

Timo Walther

Follow me: @twalthr
https://www.linkedin.com/in/twalthr

Reply via email to