Hi,
just an idea: In the source code documentation, it states that
projectFirst and projectSecond lose type information, which could
explain why your group reduce expects <Tuple, R>.
I found an example [1] that calls .types() to define the returned types,
but this method is deprecated. What I would try is to replace the
projectFirst and projectSecond with a JoinFunction and output
Tuple2<String, Float> "manually" like so:
actors.map(new JoinNames())
.join(weightedRatings)
.where(1).equalTo(0)
.with(new JoinFunction<
TypeLeft, // output type of JoinNames()
Tuple2<String, Float>,
Tuple2<String, Float>> {
@Override
public Tuple2<String, Float> join(
TypeLeft left,
Tuple2<String, Float> right) throws Exception {
return new Tuple2<>(left.f0, right.f1);
}
})
.withForwardedFieldsFirst("f0")
.withForwardedFieldsSecond("f1")
.groupBy(0)
.reduceGroup(new MeanRatingCalculator())
.first(10)
.print();
Hope this helps.
Best,
Martin
[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/examples.html#relational-query
On 01.11.2015 13:52, Lea Helmers wrote:
Hi!
When I try to apply a groupReduce function to a data set I get an error.
The data set is created like this:
DataSet<Tuple3<String, String, String>> actorsTemp =
env.readCsvFile("/home/lea/Documents/impro3_ws15/actors.tsv")
.fieldDelimiter("\t")
.includeFields("1110")
.types(String.class, String.class, String.class);
DataSet<Tuple3<String, String, String>> actresses =
env.readCsvFile("/home/lea/Documents/impro3_ws15/actresses.tsv")
.fieldDelimiter("\t")
.includeFields("1110")
.types(String.class, String.class, String.class);
DataSet<Tuple3<Float, Float, String>> ratings =
env.readCsvFile("/home/lea/Documents/impro3_ws15/ratings.tsv")
.fieldDelimiter("\t")
.includeFields("0111")
.types(Float.class, Float.class, String.class)
.filter(new NumberVotesFilter());
//merge actors and actresses
DataSet<Tuple3<String, String, String>> actors =
actorsTemp.union(actresses);
//create weighted rating
DataSet<Tuple2<String, Float>> weightedRatings =
ratings.map(new WeightedRatingCalculator());
THIS IS WHAT I'M TRYING IN THE MAIN METHOD:
actors.map(new JoinNames())
.join(weightedRatings)
.where(1).equalTo(0)
.projectFirst(0).projectSecond(1)
.groupBy(0)
.reduceGroup(new MeanRatingCalculator())
.first(10).print();
And here is the GroupReduce function I wrote:
public static class MeanRatingCalculator implements
GroupReduceFunction<Tuple2<String, Float>, Tuple3<String, Float, Integer>> {
public void reduce(Iterable<Tuple2<String, Float>> ratedActors,
Collector<Tuple3<String, Float, Integer>> out) throws Exception {
String name = null;
Float ratings = 0F;
int numberOfMovies = 0;
for (Tuple2<String, Float> a : ratedActors) {
//store the name
name = a.f0;
//update the sum of the ratings and number of movies
ratings += a.f1;
numberOfMovies++;
}
// emit name, average rating and number of films
out.collect(new Tuple3<String, Float, Integer>(name,
ratings/(float)numberOfMovies, numberOfMovies));
}
}
I get the following error message when I try to compile the code:
java: method reduceGroup in class
org.apache.flink.api.java.operators.UnsortedGrouping<T> cannot be
applied to given types;
required:
org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R>
found: de.tub.dima.TopActors.MeanRatingCalculator
reason: no instance(s) of type variable(s) R exist so that argument type
de.tub.dima.TopActors.MeanRatingCalculator conforms to formal parameter
type
org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R>
I can't figure out what the problem might be and would be very grateful
for any help!! I hope I have given all the necessary information. I'm
using Ubuntu 14.04 and IntelliJ Idea as IDE.
Thank you very much,
Lea