Hi I think you should modify initModel() function to getOrCreateModel() and create the model as singleton object. You may want to refer this link <http://stackoverflow.com/questions/30450763/spark-streaming-and-connection-pool-implementation>
On Mon, Aug 8, 2016 at 7:44 PM, 莫涛 <mo...@sensetime.com> wrote: > Hi Ndjido, > > Thanks for your reply. > > Yes, it is good idea if the model can be broadcast. > > I'm working with a built library (on Linux, say classifier.so and > classifier.h) and it requires the model file is in the local file system. > As I don't have access to the library code, I write JNI to wrap the > classifier. > The model file can be sent to each executor efficiently by addFile and > getFile. > But initModel() is still expensive as it actually loads a local file into > C++ heap memory, which is not serializable. > > That's the reason I can not broadcast the model and I have to avoid load > model as possible as I can. > > Best > > ------------------------------ > *发件人:* ndj...@gmail.com <ndj...@gmail.com> > *发送时间:* 2016年8月8日 17:16:27 > *收件人:* 莫涛 > *抄送:* user@spark.apache.org > *主题:* Re: how to generate a column using mapParition and then add it back > to the df? > > > Hi MoTao, > What about broadcasting the model? > > Cheers, > Ndjido. > > > On 08 Aug 2016, at 11:00, MoTao <mo...@sensetime.com> wrote: > > > > Hi all, > > > > I'm trying to append a column to a df. > > I understand that the new column must be created by > > 1) using literals, > > 2) transforming an existing column in df, > > or 3) generated from udf over this df > > > > In my case, the column to be appended is created by processing each row, > > like > > > > val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value") > > val func = udf { > > v: Double => { > > val model = initModel() > > model.process(v) > > } > > } > > val df2 = df.withColumn("valueWithBias", func(col("value"))) > > > > This works fine. However, for performance reason, I want to avoid > > initModel() for each row. > > So I come with mapParitions, like > > > > val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value") > > val df2 = df.mapPartitions(rows => { > > val model = initModel() > > rows.map(row => model.process(row.getAs[Double](0))) > > }) > > val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL > > > > But this is wrong as a column of df2 *CANNOT* be appended to df. > > > > The only solution I got is to force mapPartitions to return a whole row > > instead of the new column, > > ( Something like "row => Row.fromSeq(row.toSeq ++ > > Array(model.process(...)))" ) > > which requires a lot of copy as well. > > > > I wonder how to deal with this problem with as few overhead as possible? > > > > > > > > > > -- > > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/how-to-generate-a-column-using- > mapParition-and-then-add-it-back-to-the-df-tp27493.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > --------------------------------------------------------------------- > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > -- Best Regards, Ayan Guha