How about using foreachRDD ? I think this is much better than your trick.
2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com>: > Hi Guys, > > In the end, I am using below. > The trick is using "native python map" along with "spark spreaming > transform". > May not an elegent way, however it works :). > > def predictScore(texts, modelRF): > predictions = texts.map( lambda txt : (txt , getFeatures(txt)) ).\ > map(lambda (txt, features) : (txt ,(features.split(','))) ).\ > map( lambda (txt, features) : (txt, ([float(i) for i in features])) > ).\ > transform( lambda rdd: sc.parallelize(\ > map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda > (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\ > )\ > ) > # in the transform operation: x=text and y=features > # Retrun will be tuple of (score,'original text') > return predictions > > > Hope, it will help somebody who is facing same problem. > If anybody has better idea, please post it here. > > -Obaid > > On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com> > wrote: > >> Dstream has an method foreachRDD, so you can walk through all RDDs inside >> DStream as you want. >> >> >> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html >> >> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >> >>> Hi nguyen, >>> >>> If I am not mistaken, we cannot call "predict" on "dstream" as you >>> have suggested. >>> We have to use "transform" to be able to perform normal RDD operations >>> on dstreams and here I am trapped. >>> >>> -Obaid >>> >>> >>> >>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <newvalu...@gmail.com> >>> wrote: >>> >>>> How about this ? >>>> >>>> def extract_feature(rf_model, x): >>>> text = getFeatures(x).split(',') >>>> fea = [float(i) for i in text] >>>> prediction = rf_model.predict(fea) >>>> return (prediction, x) >>>> output = texts.map(lambda x: extract_feature(rf_model, x)) >>>> >>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>> >>>>> Hi, >>>>> >>>>> Anybody has any idea on below? >>>>> >>>>> -Obaid >>>>> >>>>> >>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote: >>>>> >>>>>> Hi Guys, >>>>>> >>>>>> This is my first mail to spark users mailing list. >>>>>> >>>>>> I need help on Dstream operation. >>>>>> >>>>>> In fact, I am using a MLlib randomforest model to predict using spark >>>>>> streaming. In the end, I want to combine the feature Dstream & prediction >>>>>> Dstream together for further downstream processing. >>>>>> >>>>>> I am predicting using below piece of code: >>>>>> >>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x : >>>>>> x.split(',')).map( lambda parts : [float(i) for i in parts] >>>>>> ).transform(lambda rdd: rf_model.predict(rdd)) >>>>>> >>>>>> Here texts is dstream having single line of text as records >>>>>> getFeatures generates a comma separated features extracted from each >>>>>> record >>>>>> >>>>>> >>>>>> I want the output as below tuple: >>>>>> ("predicted value", "original text") >>>>>> >>>>>> How can I achieve that ? >>>>>> or >>>>>> at least can I perform .zip like normal RDD operation on two >>>>>> Dstreams, like below: >>>>>> output = texts.zip(predictions) >>>>>> >>>>>> >>>>>> Thanks in advance. >>>>>> >>>>>> -Obaid >>>>>> >>>>> >>>> >>> >> >