Hi Till,

I am using flink 0.10.1 and if i am not wrong it corresponds to the
1.0-Snapshot you mentioned.

[image: Inline image 1]

If wrong, please suggest what should I do to fix it.

Thanks & Regards
Biplob Biswas

On Mon, Jan 18, 2016 at 11:23 AM, Till Rohrmann <trohrm...@apache.org>
wrote:

> Hi Biplob,
>
> which version of Flink are you using? With version 1.0-SNAPSHOT, I cannot
> reproduce your problem.
>
> Cheers,
> Till
> ​
>
> On Sun, Jan 17, 2016 at 4:56 PM, Biplob Biswas <revolutioni...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am getting the following exception when i am using the map function
>>
>> Exception in thread "main"
>>> org.apache.flink.api.common.functions.InvalidTypesException: The return
>>> type of function 'computeWeightedDistribution(GraphWeighted.java:73)' could
>>> not be determined automatically, due to type erasure. You can give type
>>> information hints by using the returns(...) method on the result of the
>>> transformation call, or by letting your function implement the
>>> 'ResultTypeQueryable' interface.
>>> at org.apache.flink.api.java.DataSet.getType(DataSet.java:176)
>>> at org.apache.flink.api.java.DataSet.groupBy(DataSet.java:692)
>>> at aim3.GraphWeighted.computeWeightedDistribution(GraphWeighted.java:74)
>>> at aim3.SlashdotZooInDegree.main(SlashdotZooInDegree.java:39)
>>> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
>>> Input mismatch: Basic type 'Integer' expected but was 'Long'.
>>> at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:767)
>>> at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:276)
>>> at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:110)
>>> at org.apache.flink.api.java.DataSet.map(DataSet.java:213)
>>> at aim3.GraphWeighted.computeWeightedDistribution(GraphWeighted.java:73)
>>> ... 1 more
>>
>>
>>
>> This is the part of the code which I am trying to run :
>>
>> DataSet<Tuple2<String, Long>> distinctVertex = sourceVertex
>>>      .union(destinationVertex)
>>>      .groupBy(0)
>>>      .aggregate(Aggregations.SUM, 1);
>>>         // Compute the degrees (degree, count)
>>>
>>>      DataSet<Tuple2<Long, Integer>> degreeCount = distinctVertex
>>>      .map(new DegreeMapper())
>>>      .groupBy(0)
>>>      .aggregate(Aggregations.SUM, 1);
>>
>>
>>
>> and the error I am getting is at this line *.map(new DegreeMapper())*
>>
>> Also, the degree mapper is a simply map function which emits the second
>> column and 1 as follows:
>>
>>>
>>>     public static class DegreeMapper implements
>>> MapFunction<Tuple2<String, Long>, Tuple2<Long, Integer>> {
>>> private static final long serialVersionUID = 1L;
>>> public Tuple2<Long, Integer> map(Tuple2<String, Long> input) throws
>>> Exception {
>>> return new Tuple2<Long, Integer>(input.f1, 1);
>>> }
>>>     }
>>
>>
>>
>> Now I am lost as to what I did wrong and why I am getting that error, any
>> help would be appreciated.
>>
>> Thanks a lot.
>>
>> Thanks & Regards
>> Biplob Biswas
>>
>
>

Reply via email to