Hello, i'm new to flink, and i'm trying to get a mongodb hadoop input format working in scala. However, i get lost in the scala generics system ... could somebody help me ?
Code is below, neither version works (compile error at the "map" call), either because of method not applicable either because of ambiguous reference to overloaded method map (flink 1.0.3) Thanks already greetings, Frank import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.scala.DataSet; import org.apache.flink.api.scala.ExecutionEnvironment; import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat; import org.apache.hadoop.mapred.JobConf; import org.bson.BSONObject; import com.mongodb.BasicDBObject; import com.mongodb.hadoop.io.BSONWritable; import com.mongodb.hadoop.mapred.MongoInputFormat; val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable], classOf[BSONWritable], new JobConf()) hdIf.getJobConf().set("mongo.input.uri", "mongodb://localhost:27017/handling.event"); val input = env.createInput(hdIf); def mapfunc(t1: BSONWritable, t2: BSONWritable): String = { return t1.toString() } // does not work //input.map mapfunc // does not work either input.map( (t1: BSONWritable, t2: BSONWritable) => t1 ) // does not work either //input.map ( (t1, t2) => t1 )