Hi

I think you should modify initModel() function to getOrCreateModel() and
create the model as singleton object. You may want to refer this link
<http://stackoverflow.com/questions/30450763/spark-streaming-and-connection-pool-implementation>

On Mon, Aug 8, 2016 at 7:44 PM, 莫涛 <mo...@sensetime.com> wrote:

> Hi Ndjido,
>
> Thanks for your reply.
>
> Yes, it is good idea if the model can be broadcast.
>
> I'm working with a built library (on Linux, say classifier.so and
> classifier.h) and it requires the model file is in the local file system.
> As I don't have access to the library code, I write JNI to wrap the
> classifier.
> The model file can be sent to each executor efficiently by addFile and
> getFile.
> But initModel() is still expensive as it actually loads a local file into
> C++ heap memory, which is not serializable.
>
> That's the reason I can not broadcast the model and I have to avoid load
> model as possible as I can.
>
> Best
>
> ------------------------------
> *发件人:* ndj...@gmail.com <ndj...@gmail.com>
> *发送时间:* 2016年8月8日 17:16:27
> *收件人:* 莫涛
> *抄送:* user@spark.apache.org
> *主题:* Re: how to generate a column using mapParition and then add it back
> to the df?
>
>
> Hi MoTao,
> What about broadcasting the model?
>
> Cheers,
> Ndjido.
>
> > On 08 Aug 2016, at 11:00, MoTao <mo...@sensetime.com> wrote:
> >
> > Hi all,
> >
> > I'm trying to append a column to a df.
> > I understand that the new column must be created by
> > 1) using literals,
> > 2) transforming an existing column in df,
> > or 3) generated from udf over this df
> >
> > In my case, the column to be appended is created by processing each row,
> > like
> >
> > val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> > val func = udf {
> >  v: Double => {
> >    val model = initModel()
> >    model.process(v)
> >  }
> > }
> > val df2 = df.withColumn("valueWithBias", func(col("value")))
> >
> > This works fine. However, for performance reason, I want to avoid
> > initModel() for each row.
> > So I come with mapParitions, like
> >
> > val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> > val df2 = df.mapPartitions(rows => {
> >  val model = initModel()
> >  rows.map(row => model.process(row.getAs[Double](0)))
> > })
> > val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL
> >
> > But this is wrong as a column of df2 *CANNOT* be appended to df.
> >
> > The only solution I got is to force mapPartitions to return a whole row
> > instead of the new column,
> > ( Something like "row => Row.fromSeq(row.toSeq ++
> > Array(model.process(...)))" )
> > which requires a lot of copy as well.
> >
> > I wonder how to deal with this problem with as few overhead as possible?
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/how-to-generate-a-column-using-
> mapParition-and-then-add-it-back-to-the-df-tp27493.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>



-- 
Best Regards,
Ayan Guha

Reply via email to