Hi Yukun,
I think the problem of the input type inference is that SortedMap is a
GenericType and not a Flink native type (like Tuple or POJO). This case
is not supported at the moment. You can create an issue if you like,
maybe there is a way to support this special type inference case.
Timo
On 09.07.2016 11:55, Yukun Guo wrote:
Hi Robert,
On 9 July 2016 at 00:25, Robert Metzger <rmetz...@apache.org
<mailto:rmetz...@apache.org>> wrote:
Hi Yukun,
can you also post the code how you are invoking the
GenericFlatMapper on the mailing list?
Here is the code defining the topology:
DataStream<String> stream = ...; stream .keyBy(new KeySelector<String,
Integer>() { @Override public Integer getKey(String x) throws
Exception { return x.hashCode() % 10; } })
.timeWindow(Time.seconds(10)) .fold(new TreeMap<String, Long>(), new
FoldFunction<String, SortedMap<String, Long>>() { @Override public
SortedMap<String, Long> fold(SortedMap<String, Long> map, String x) {
Long current = map.get(x); Long updated = current != null ? current +
1 : 1; map.put(x, updated); return map; } }) .flatMap(new
GenericFlatMapper<String>()) .returns(new TypeHint<Tuple2<String,
Long>>(){}.getTypeInfo()) // throws InvalidTypesException if you
comment out this line .print();
The Java compiler is usually dropping the generic types during
compilation ("type erasure"), that's why we can not infer the types.
The error message implies type extraction should be possible when "all
variables in the return type can be deduced from the input type(s)".
This is true for flatMap(Tuple2<Long, T>, Collector<Tuple2<T,
String>>), but if the signature is changed to void
flatMap(SortedMap<T, Long>, Collector<Tuple2<T, Long>>), type
inference fails.
On Fri, Jul 8, 2016 at 12:27 PM, Yukun Guo <gyk....@gmail.com
<mailto:gyk....@gmail.com>> wrote:
Hi,
When I run the code implementing a generic FlatMapFunction,
Flink complained about InvalidTypesException:
public class GenericFlatMapper<T>implements FlatMapFunction<SortedMap<T,Long>,Tuple2<T,Long>>{
@Override public void flatMap(SortedMap<T,Long>m,Collector<Tuple2<T,Long>>out) throws Exception { for
(Map.Entry<T,Long>entry :m.entrySet()) { out.collect(Tuple2.of(entry.getKey(),entry.getValue()));
} } }
/Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException:
The return type of function could not be determined
automatically, due to type erasure. You can give type
information hints by using the returns(...) method on the
result of the transformation call, or by letting your function
implement the 'ResultTypeQueryable' interface.
/
/...
/
/Caused by:
org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'T' in 'class GenericFlatMapper' could
not be determined. This is most likely a type erasure problem.
The type extraction currently supports types with generic
variables only in cases where all variables in the return type
can be deduced from the input type(s)./
This puzzles me as Flink should be able to infer the type from
arguments. I know returns(...) or other workarounds to give
type hint, but they are kind of verbose. Any suggestions?
--
Freundliche Grüße / Kind Regards
Timo Walther
Follow me: @twalthr
https://www.linkedin.com/in/twalthr