How about using foreachRDD ? I think this is much better than your trick.

2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com>:

> Hi Guys,
>
> In the end, I am using below.
> The trick is using "native python map" along with "spark spreaming
> transform".
> May not an elegent way, however it works :).
>
> def predictScore(texts, modelRF):
>     predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
>      map(lambda (txt, features) : (txt ,(features.split(','))) ).\
>      map( lambda (txt, features) : (txt, ([float(i) for i in features]))
> ).\
>      transform( lambda  rdd: sc.parallelize(\
>        map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda
> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
>        )\
>      )
>      # in the transform operation: x=text and y=features
>     # Retrun will be tuple of (score,'original text')
>     return predictions
>
>
> Hope, it will help somebody who is facing same problem.
> If anybody has better idea, please post it here.
>
> -Obaid
>
> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com>
> wrote:
>
>> Dstream has an method foreachRDD, so you can walk through all RDDs inside
>> DStream as you want.
>>
>>
>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html
>>
>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>>
>>> Hi nguyen,
>>>
>>> If I am not mistaken, we cannot call  "predict" on "dstream" as you
>>> have suggested.
>>> We have to use "transform" to be able to perform normal RDD operations
>>> on dstreams and here I am trapped.
>>>
>>> -Obaid
>>>
>>>
>>>
>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <newvalu...@gmail.com>
>>> wrote:
>>>
>>>> How about this ?
>>>>
>>>> def extract_feature(rf_model, x):
>>>> text = getFeatures(x).split(',')
>>>> fea = [float(i) for i in text]
>>>> prediction = rf_model.predict(fea)
>>>> return (prediction, x)
>>>> output = texts.map(lambda x: extract_feature(rf_model, x))
>>>>
>>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> Anybody has any idea on below?
>>>>>
>>>>> -Obaid
>>>>>
>>>>>
>>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote:
>>>>>
>>>>>> Hi Guys,
>>>>>>
>>>>>> This is my first mail to spark users mailing list.
>>>>>>
>>>>>> I need help on Dstream operation.
>>>>>>
>>>>>> In fact, I am using a MLlib randomforest model to predict using spark
>>>>>> streaming. In the end, I want to combine the feature Dstream & prediction
>>>>>> Dstream together for further downstream processing.
>>>>>>
>>>>>> I am predicting using below piece of code:
>>>>>>
>>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x :
>>>>>> x.split(',')).map( lambda parts : [float(i) for i in parts]
>>>>>> ).transform(lambda rdd: rf_model.predict(rdd))
>>>>>>
>>>>>> Here texts is dstream having single line of text as records
>>>>>> getFeatures generates a comma separated features extracted from each
>>>>>> record
>>>>>>
>>>>>>
>>>>>> I want the output as below tuple:
>>>>>> ("predicted value", "original text")
>>>>>>
>>>>>> How can I achieve that ?
>>>>>> or
>>>>>> at least can I perform .zip like normal RDD operation on two
>>>>>> Dstreams, like below:
>>>>>> output = texts.zip(predictions)
>>>>>>
>>>>>>
>>>>>> Thanks in advance.
>>>>>>
>>>>>> -Obaid
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to