Hi Frank,

input should be of DataSet[(BSONWritable, BSONWritable)], so a
Tuple2[BSONWritable, BSONWritable], right?

Something like this should work:

input.map( pair => pair._1.toString )

Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the key
of the pair.

Alternatively you can also add an import
org.apache.flink.api.scala.extensions._

and then you can do

input.mapWith { case (x, y) => x }

Best, Fabian


2016-09-08 18:30 GMT+02:00 Frank Dekervel <ker...@gmail.com>:

> Hello,
>
> i'm new to flink, and i'm trying to get a mongodb hadoop input format
> working in scala. However, i get lost in the scala generics system ...
> could somebody help me ?
>
> Code is below, neither version works (compile error at the "map" call),
> either because of method not applicable either because of ambiguous
> reference to overloaded method map (flink 1.0.3)
>
> Thanks already
> greetings,
> Frank
>
>
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.scala.DataSet;
> import org.apache.flink.api.scala.ExecutionEnvironment;
> import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;
>
> import org.apache.hadoop.mapred.JobConf;
> import org.bson.BSONObject;
>
> import com.mongodb.BasicDBObject;
> import com.mongodb.hadoop.io.BSONWritable;
> import com.mongodb.hadoop.mapred.MongoInputFormat;
>
> val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable],
> classOf[BSONWritable], new JobConf())
>
> hdIf.getJobConf().set("mongo.input.uri",
>     "mongodb://localhost:27017/handling.event");
>
> val input = env.createInput(hdIf);
>
> def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
>     return t1.toString()
> }
>
> // does not work
> //input.map mapfunc
>
> // does not work either
> input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
> // does not work either
> //input.map ( (t1, t2) => t1 )
>
>

Reply via email to