Hi Till, I am using flink 0.10.1 and if i am not wrong it corresponds to the 1.0-Snapshot you mentioned.
[image: Inline image 1] If wrong, please suggest what should I do to fix it. Thanks & Regards Biplob Biswas On Mon, Jan 18, 2016 at 11:23 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Biplob, > > which version of Flink are you using? With version 1.0-SNAPSHOT, I cannot > reproduce your problem. > > Cheers, > Till > > > On Sun, Jan 17, 2016 at 4:56 PM, Biplob Biswas <revolutioni...@gmail.com> > wrote: > >> Hi, >> >> I am getting the following exception when i am using the map function >> >> Exception in thread "main" >>> org.apache.flink.api.common.functions.InvalidTypesException: The return >>> type of function 'computeWeightedDistribution(GraphWeighted.java:73)' could >>> not be determined automatically, due to type erasure. You can give type >>> information hints by using the returns(...) method on the result of the >>> transformation call, or by letting your function implement the >>> 'ResultTypeQueryable' interface. >>> at org.apache.flink.api.java.DataSet.getType(DataSet.java:176) >>> at org.apache.flink.api.java.DataSet.groupBy(DataSet.java:692) >>> at aim3.GraphWeighted.computeWeightedDistribution(GraphWeighted.java:74) >>> at aim3.SlashdotZooInDegree.main(SlashdotZooInDegree.java:39) >>> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: >>> Input mismatch: Basic type 'Integer' expected but was 'Long'. >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:767) >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:276) >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:110) >>> at org.apache.flink.api.java.DataSet.map(DataSet.java:213) >>> at aim3.GraphWeighted.computeWeightedDistribution(GraphWeighted.java:73) >>> ... 1 more >> >> >> >> This is the part of the code which I am trying to run : >> >> DataSet<Tuple2<String, Long>> distinctVertex = sourceVertex >>> .union(destinationVertex) >>> .groupBy(0) >>> .aggregate(Aggregations.SUM, 1); >>> // Compute the degrees (degree, count) >>> >>> DataSet<Tuple2<Long, Integer>> degreeCount = distinctVertex >>> .map(new DegreeMapper()) >>> .groupBy(0) >>> .aggregate(Aggregations.SUM, 1); >> >> >> >> and the error I am getting is at this line *.map(new DegreeMapper())* >> >> Also, the degree mapper is a simply map function which emits the second >> column and 1 as follows: >> >>> >>> public static class DegreeMapper implements >>> MapFunction<Tuple2<String, Long>, Tuple2<Long, Integer>> { >>> private static final long serialVersionUID = 1L; >>> public Tuple2<Long, Integer> map(Tuple2<String, Long> input) throws >>> Exception { >>> return new Tuple2<Long, Integer>(input.f1, 1); >>> } >>> } >> >> >> >> Now I am lost as to what I did wrong and why I am getting that error, any >> help would be appreciated. >> >> Thanks a lot. >> >> Thanks & Regards >> Biplob Biswas >> > >