Hi Robert, On 9 July 2016 at 00:25, Robert Metzger <rmetz...@apache.org> wrote:
> Hi Yukun, > > can you also post the code how you are invoking the GenericFlatMapper on > the mailing list? > Here is the code defining the topology: DataStream<String> stream = ...; stream .keyBy(new KeySelector<String, Integer>() { @Override public Integer getKey(String x) throws Exception { return x.hashCode() % 10; } }) .timeWindow(Time.seconds(10)) .fold(new TreeMap<String, Long>(), new FoldFunction<String, SortedMap<String, Long>>() { @Override public SortedMap<String, Long> fold(SortedMap<String, Long> map, String x) { Long current = map.get(x); Long updated = current != null ? current + 1 : 1; map.put(x, updated); return map; } }) .flatMap(new GenericFlatMapper<String>()) .returns(new TypeHint<Tuple2<String, Long>>(){}.getTypeInfo()) // throws InvalidTypesException if you comment out this line .print(); > > The Java compiler is usually dropping the generic types during compilation > ("type erasure"), that's why we can not infer the types. > > The error message implies type extraction should be possible when "all variables in the return type can be deduced from the input type(s)". This is true for flatMap(Tuple2<Long, T>, Collector<Tuple2<T, String>>), but if the signature is changed to void flatMap(SortedMap<T, Long>, Collector<Tuple2<T, Long>>), type inference fails. > > On Fri, Jul 8, 2016 at 12:27 PM, Yukun Guo <gyk....@gmail.com> wrote: > >> Hi, >> When I run the code implementing a generic FlatMapFunction, Flink >> complained about InvalidTypesException: >> >> public class GenericFlatMapper<T> implements FlatMapFunction<SortedMap<T, >> Long>, Tuple2<T, Long>> { >> @Override >> public void flatMap(SortedMap<T, Long> m, Collector<Tuple2<T, Long>> >> out) throws Exception { >> for (Map.Entry<T, Long> entry : m.entrySet()) { >> out.collect(Tuple2.of(entry.getKey(), entry.getValue())); >> } >> } >> } >> >> >> *Exception in thread "main" >> org.apache.flink.api.common.functions.InvalidTypesException: The return >> type of function could not be determined automatically, due to type >> erasure. You can give type information hints by using the returns(...) >> method on the result of the transformation call, or by letting your >> function implement the 'ResultTypeQueryable' interface.* >> >> *...* >> *Caused by: org.apache.flink.api.common.functions.InvalidTypesException: >> Type of TypeVariable 'T' in 'class GenericFlatMapper' could not be >> determined. This is most likely a type erasure problem. The type extraction >> currently supports types with generic variables only in cases where all >> variables in the return type can be deduced from the input type(s).* >> >> This puzzles me as Flink should be able to infer the type from arguments. >> I know returns(...) or other workarounds to give type hint, but they are >> kind of verbose. Any suggestions? >> >> >