Hi,

I am getting the following exception when i am using the map function

Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: The return
> type of function 'computeWeightedDistribution(GraphWeighted.java:73)' could
> not be determined automatically, due to type erasure. You can give type
> information hints by using the returns(...) method on the result of the
> transformation call, or by letting your function implement the
> 'ResultTypeQueryable' interface.
> at org.apache.flink.api.java.DataSet.getType(DataSet.java:176)
> at org.apache.flink.api.java.DataSet.groupBy(DataSet.java:692)
> at aim3.GraphWeighted.computeWeightedDistribution(GraphWeighted.java:74)
> at aim3.SlashdotZooInDegree.main(SlashdotZooInDegree.java:39)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
> Input mismatch: Basic type 'Integer' expected but was 'Long'.
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:767)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:276)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:110)
> at org.apache.flink.api.java.DataSet.map(DataSet.java:213)
> at aim3.GraphWeighted.computeWeightedDistribution(GraphWeighted.java:73)
> ... 1 more



This is the part of the code which I am trying to run :

DataSet<Tuple2<String, Long>> distinctVertex = sourceVertex
>      .union(destinationVertex)
>      .groupBy(0)
>      .aggregate(Aggregations.SUM, 1);
>         // Compute the degrees (degree, count)
>
>      DataSet<Tuple2<Long, Integer>> degreeCount = distinctVertex
>      .map(new DegreeMapper())
>      .groupBy(0)
>      .aggregate(Aggregations.SUM, 1);



and the error I am getting is at this line *.map(new DegreeMapper())*

Also, the degree mapper is a simply map function which emits the second
column and 1 as follows:

>
>     public static class DegreeMapper implements MapFunction<Tuple2<String,
> Long>, Tuple2<Long, Integer>> {
> private static final long serialVersionUID = 1L;
> public Tuple2<Long, Integer> map(Tuple2<String, Long> input) throws
> Exception {
> return new Tuple2<Long, Integer>(input.f1, 1);
> }
>     }



Now I am lost as to what I did wrong and why I am getting that error, any
help would be appreciated.

Thanks a lot.

Thanks & Regards
Biplob Biswas

Reply via email to