Hi,

just an idea: In the source code documentation, it states that projectFirst and projectSecond lose type information, which could explain why your group reduce expects <Tuple, R>.

I found an example [1] that calls .types() to define the returned types, but this method is deprecated. What I would try is to replace the projectFirst and projectSecond with a JoinFunction and output Tuple2<String, Float> "manually" like so:

actors.map(new JoinNames())
 .join(weightedRatings)
 .where(1).equalTo(0)
 .with(new JoinFunction<
        TypeLeft,  // output type of JoinNames()
        Tuple2<String, Float>,
        Tuple2<String, Float>> {
        
        @Override
        public Tuple2<String, Float> join(
          TypeLeft left,
          Tuple2<String, Float> right) throws Exception {
          return new Tuple2<>(left.f0, right.f1);
        }
  })
  .withForwardedFieldsFirst("f0")
  .withForwardedFieldsSecond("f1")
  .groupBy(0)
  .reduceGroup(new MeanRatingCalculator())
  .first(10)
  .print();

Hope this helps.

Best,
Martin

[1] https://ci.apache.org/projects/flink/flink-docs-master/apis/examples.html#relational-query

On 01.11.2015 13:52, Lea Helmers wrote:
Hi!

When I try to apply a groupReduce function to a data set I get an error.

The data set is created like this:

DataSet<Tuple3<String, String, String>> actorsTemp =

env.readCsvFile("/home/lea/Documents/impro3_ws15/actors.tsv")
                         .fieldDelimiter("\t")
                         .includeFields("1110")
                         .types(String.class, String.class, String.class);

         DataSet<Tuple3<String, String, String>> actresses =

env.readCsvFile("/home/lea/Documents/impro3_ws15/actresses.tsv")
                         .fieldDelimiter("\t")
                         .includeFields("1110")
                         .types(String.class, String.class, String.class);

         DataSet<Tuple3<Float, Float, String>> ratings =

env.readCsvFile("/home/lea/Documents/impro3_ws15/ratings.tsv")
                         .fieldDelimiter("\t")
                         .includeFields("0111")
                         .types(Float.class, Float.class, String.class)
                         .filter(new NumberVotesFilter());

         //merge actors and actresses
         DataSet<Tuple3<String, String, String>> actors =
actorsTemp.union(actresses);
         //create weighted rating
         DataSet<Tuple2<String, Float>> weightedRatings =
                     ratings.map(new WeightedRatingCalculator());

THIS IS WHAT I'M TRYING IN THE MAIN METHOD:

                     actors.map(new JoinNames())
                           .join(weightedRatings)
                           .where(1).equalTo(0)
                           .projectFirst(0).projectSecond(1)
                           .groupBy(0)
                           .reduceGroup(new MeanRatingCalculator())
                           .first(10).print();


And here is the GroupReduce function I wrote:

public static class MeanRatingCalculator implements
GroupReduceFunction<Tuple2<String, Float>, Tuple3<String, Float, Integer>> {

         public void reduce(Iterable<Tuple2<String, Float>> ratedActors,
Collector<Tuple3<String, Float, Integer>> out) throws Exception {

             String name = null;
             Float ratings = 0F;
             int numberOfMovies = 0;
             for (Tuple2<String, Float> a : ratedActors) {
                 //store the name
                 name = a.f0;
                 //update the sum of the ratings and number of movies
                 ratings += a.f1;
                 numberOfMovies++;
             }
             // emit name, average rating and number of films
             out.collect(new Tuple3<String, Float, Integer>(name,
ratings/(float)numberOfMovies, numberOfMovies));
         }
     }


I get the following error message when I try to compile the code:

java: method reduceGroup in class
org.apache.flink.api.java.operators.UnsortedGrouping<T> cannot be
applied to given types;
   required:
org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R>
   found: de.tub.dima.TopActors.MeanRatingCalculator
reason: no instance(s) of type variable(s) R exist so that argument type
de.tub.dima.TopActors.MeanRatingCalculator conforms to formal parameter
type
org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R>


I can't figure out what the problem might be and would be very grateful
for any help!! I hope I have given all the necessary information. I'm
using Ubuntu 14.04 and IntelliJ Idea as IDE.

Thank you very much,

Lea

Reply via email to