2016-02-15 14:02 GMT+01:00 Sun, Rui <[email protected]>:
> On computation, RRDD launches one R process for each partition, so there
> won't be thread-safe issue
>
> Could you give more details on your new environment?
Running on EC2, I start the executors via
/usr/bin/R CMD javareconf -e "/usr/lib/spark/sbin/start-master.sh"
I invoke R via roughly
object R {
case class Element(value: Double)
lazy val re = Option(REngine.getLastEngine()).getOrElse({
val eng = new JRI.JRIEngine()
eng.parseAndEval(scala.io.Source.fromInputStream(this.getClass().getClassLoader().getResourceAsStream("r/fit.R")).mkString)
eng
})
def fit(curve: Seq[Element]): Option[Fitting] = {
synchronized {
val env = re.newEnvironment(null, false)
re.assign("curve", new REXPDouble(curve.map(_.value).toArray), env)
val df = re.parseAndEval("data.frame(curve=curve)", env, true)
re.assign("df", df, env)
val fitted = re.parseAndEval("fit(df)", env, true).asList
if (fitted.keys == null) {
None
} else {
val map = fitted.keys.map(key => (key,
fitted.at(key).asDouble)).toMap
Some(Fitting(map("values")))
}
}
}
}
where `fit` is wrapped in an UDAF.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]