Hello,

i'm new to flink, and i'm trying to get a mongodb hadoop input format
working in scala. However, i get lost in the scala generics system ...
could somebody help me ?

Code is below, neither version works (compile error at the "map" call),
either because of method not applicable either because of ambiguous
reference to overloaded method map (flink 1.0.3)

Thanks already
greetings,
Frank


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.api.scala.ExecutionEnvironment;
import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;

import org.apache.hadoop.mapred.JobConf;
import org.bson.BSONObject;

import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.mapred.MongoInputFormat;

val hdIf = new HadoopInputFormat(new
MongoInputFormat(),classOf[BSONWritable], classOf[BSONWritable], new
JobConf())

hdIf.getJobConf().set("mongo.input.uri",
    "mongodb://localhost:27017/handling.event");

val input = env.createInput(hdIf);

def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
    return t1.toString()
}

// does not work
//input.map mapfunc

// does not work either
input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
// does not work either
//input.map ( (t1, t2) => t1 )

Reply via email to