Hello Fabian, Thanks, your solution works indeed. however, i don't understand why. When i replace the lambda by an explicit function
def mapfunc2(pair: Tuple2[BSONWritable, BSONWritable]) : String = { return pair._1.toString } input.map mapfunc2 i get the error below, which seemingly indicates that my method call maps both to the scala version (first overloaded method) and the java version (which works with a MapFunction, second one in the error message) this was also the error i got when doing the following (which looks the most logical to me) def mapfunc(t1: BSONWritable, t2: BSONWritable): String = { return t1.toString() } input.map mapfunc it would seem logical to me to decompose the pair as 2 separate arguments (which is what the java version of the example also does at https://github.com/okkam-it/flink-mongodb-test) and this is the error message: both method map in class DataSet of type [R](fun: ((com.mongodb.hadoop.io.BSONWritable, com.mongodb.hadoop.io.BSONWritable)) => R)(implicit evidence$4: org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit evidence$5: scala.reflect.ClassTag[R])org.apache.flink.api.scala.DataSet[R] and method map in class DataSet of type [R](mapper: org.apache.flink.api.common.functions.MapFunction[(com.mongodb.hadoop.io.BSONWritable, com.mongodb.hadoop.io.BSONWritable),R])(implicit evidence$2: org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit evidence$3: scala.reflect.ClassTag[R])org.apache.flink.api.scala.DataSet[R] match expected type ? Thanks! Frank On Thu, Sep 8, 2016 at 6:56 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Frank, > > input should be of DataSet[(BSONWritable, BSONWritable)], so a > Tuple2[BSONWritable, BSONWritable], right? > > Something like this should work: > > input.map( pair => pair._1.toString ) > > Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the key > of the pair. > > Alternatively you can also add an import org.apache.flink.api.scala. > extensions._ > > and then you can do > > input.mapWith { case (x, y) => x } > > Best, Fabian > > > 2016-09-08 18:30 GMT+02:00 Frank Dekervel <ker...@gmail.com>: > >> Hello, >> >> i'm new to flink, and i'm trying to get a mongodb hadoop input format >> working in scala. However, i get lost in the scala generics system ... >> could somebody help me ? >> >> Code is below, neither version works (compile error at the "map" call), >> either because of method not applicable either because of ambiguous >> reference to overloaded method map (flink 1.0.3) >> >> Thanks already >> greetings, >> Frank >> >> >> import org.apache.flink.api.common.functions.MapFunction; >> import org.apache.flink.api.scala.DataSet; >> import org.apache.flink.api.scala.ExecutionEnvironment; >> import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat; >> >> import org.apache.hadoop.mapred.JobConf; >> import org.bson.BSONObject; >> >> import com.mongodb.BasicDBObject; >> import com.mongodb.hadoop.io.BSONWritable; >> import com.mongodb.hadoop.mapred.MongoInputFormat; >> >> val hdIf = new HadoopInputFormat(new >> MongoInputFormat(),classOf[BSONWritable], >> classOf[BSONWritable], new JobConf()) >> >> hdIf.getJobConf().set("mongo.input.uri", >> "mongodb://localhost:27017/handling.event"); >> >> val input = env.createInput(hdIf); >> >> def mapfunc(t1: BSONWritable, t2: BSONWritable): String = { >> return t1.toString() >> } >> >> // does not work >> //input.map mapfunc >> >> // does not work either >> input.map( (t1: BSONWritable, t2: BSONWritable) => t1 ) >> // does not work either >> //input.map ( (t1, t2) => t1 ) >> >> >