Hi Frank, input should be of DataSet[(BSONWritable, BSONWritable)], so a Tuple2[BSONWritable, BSONWritable], right?
Something like this should work: input.map( pair => pair._1.toString ) Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the key of the pair. Alternatively you can also add an import org.apache.flink.api.scala.extensions._ and then you can do input.mapWith { case (x, y) => x } Best, Fabian 2016-09-08 18:30 GMT+02:00 Frank Dekervel <ker...@gmail.com>: > Hello, > > i'm new to flink, and i'm trying to get a mongodb hadoop input format > working in scala. However, i get lost in the scala generics system ... > could somebody help me ? > > Code is below, neither version works (compile error at the "map" call), > either because of method not applicable either because of ambiguous > reference to overloaded method map (flink 1.0.3) > > Thanks already > greetings, > Frank > > > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.scala.DataSet; > import org.apache.flink.api.scala.ExecutionEnvironment; > import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat; > > import org.apache.hadoop.mapred.JobConf; > import org.bson.BSONObject; > > import com.mongodb.BasicDBObject; > import com.mongodb.hadoop.io.BSONWritable; > import com.mongodb.hadoop.mapred.MongoInputFormat; > > val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable], > classOf[BSONWritable], new JobConf()) > > hdIf.getJobConf().set("mongo.input.uri", > "mongodb://localhost:27017/handling.event"); > > val input = env.createInput(hdIf); > > def mapfunc(t1: BSONWritable, t2: BSONWritable): String = { > return t1.toString() > } > > // does not work > //input.map mapfunc > > // does not work either > input.map( (t1: BSONWritable, t2: BSONWritable) => t1 ) > // does not work either > //input.map ( (t1, t2) => t1 ) > >