Hi, I changed the WordCount example as below and I am wondering, why this does not work. Looking at the Flink internal implemenation, it make sense that it fails. However, from an API point of view it should work. Is this a bug or do I miss understand the semantic of .returns(...)?
The change is about type inference and generics. I removed the generic type information in tokenizer and provide the output type via returns(...) methods. Whoever, I get an exception (see below). Tokenizer is change to this (removed generics and added cast to String): > public static final class Tokenizer implements FlatMapFunction { > public void flatMap(Object value, Collector out) { > String[] tokens = ((String) value).toLowerCase().split("\\W+"); > for (String token : tokens) { > if (token.length() > 0) { > out.collect(new Tuple2<String, Integer>(token, > 1)); > } > } > } > } I added call to "returns(....)" here: > DataSet<Tuple2<String, Integer>> counts = > text.flatMap(new Tokenizer()).returns("Tuple2<String,Integer>") > .groupBy(0).sum(1); The exception is: > Exception in thread "main" java.lang.IllegalArgumentException: The types of > the interface org.apache.flink.api.common.functions.FlatMapFunction could not > be inferred. Support for synthetic interfaces, lambdas, and generic types is > limited at this point. > at > org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:686) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getParameterTypeFromGenericType(TypeExtractor.java:710) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:673) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:365) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:279) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:120) > at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:262) > at > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:69) -Matthias
signature.asc
Description: OpenPGP digital signature