foreachRDD does not return any value. I can be used just to send result to
another place/context, like db,file etc.
I could use that but seems like over head of having another hop.
I wanted to make it simple and light.

On Tuesday, 31 May 2016, nguyen duc tuan <newvalu...@gmail.com> wrote:

> How about using foreachRDD ? I think this is much better than your trick.
>
>
> 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com
> <javascript:_e(%7B%7D,'cvml','obaidc...@gmail.com');>>:
>
>> Hi Guys,
>>
>> In the end, I am using below.
>> The trick is using "native python map" along with "spark spreaming
>> transform".
>> May not an elegent way, however it works :).
>>
>> def predictScore(texts, modelRF):
>>     predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
>>      map(lambda (txt, features) : (txt ,(features.split(','))) ).\
>>      map( lambda (txt, features) : (txt, ([float(i) for i in features]))
>> ).\
>>      transform( lambda  rdd: sc.parallelize(\
>>        map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda
>> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
>>        )\
>>      )
>>      # in the transform operation: x=text and y=features
>>     # Retrun will be tuple of (score,'original text')
>>     return predictions
>>
>>
>> Hope, it will help somebody who is facing same problem.
>> If anybody has better idea, please post it here.
>>
>> -Obaid
>>
>> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','newvalu...@gmail.com');>> wrote:
>>
>>> Dstream has an method foreachRDD, so you can walk through all RDDs
>>> inside DStream as you want.
>>>
>>>
>>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html
>>>
>>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','obaidc...@gmail.com');>>:
>>>
>>>> Hi nguyen,
>>>>
>>>> If I am not mistaken, we cannot call  "predict" on "dstream" as you
>>>> have suggested.
>>>> We have to use "transform" to be able to perform normal RDD operations
>>>> on dstreams and here I am trapped.
>>>>
>>>> -Obaid
>>>>
>>>>
>>>>
>>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <newvalu...@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','newvalu...@gmail.com');>> wrote:
>>>>
>>>>> How about this ?
>>>>>
>>>>> def extract_feature(rf_model, x):
>>>>> text = getFeatures(x).split(',')
>>>>> fea = [float(i) for i in text]
>>>>> prediction = rf_model.predict(fea)
>>>>> return (prediction, x)
>>>>> output = texts.map(lambda x: extract_feature(rf_model, x))
>>>>>
>>>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com
>>>>> <javascript:_e(%7B%7D,'cvml','obaidc...@gmail.com');>>:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Anybody has any idea on below?
>>>>>>
>>>>>> -Obaid
>>>>>>
>>>>>>
>>>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com
>>>>>> <javascript:_e(%7B%7D,'cvml','obaidc...@gmail.com');>> wrote:
>>>>>>
>>>>>>> Hi Guys,
>>>>>>>
>>>>>>> This is my first mail to spark users mailing list.
>>>>>>>
>>>>>>> I need help on Dstream operation.
>>>>>>>
>>>>>>> In fact, I am using a MLlib randomforest model to predict using
>>>>>>> spark streaming. In the end, I want to combine the feature Dstream &
>>>>>>> prediction Dstream together for further downstream processing.
>>>>>>>
>>>>>>> I am predicting using below piece of code:
>>>>>>>
>>>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x :
>>>>>>> x.split(',')).map( lambda parts : [float(i) for i in parts]
>>>>>>> ).transform(lambda rdd: rf_model.predict(rdd))
>>>>>>>
>>>>>>> Here texts is dstream having single line of text as records
>>>>>>> getFeatures generates a comma separated features extracted from each
>>>>>>> record
>>>>>>>
>>>>>>>
>>>>>>> I want the output as below tuple:
>>>>>>> ("predicted value", "original text")
>>>>>>>
>>>>>>> How can I achieve that ?
>>>>>>> or
>>>>>>> at least can I perform .zip like normal RDD operation on two
>>>>>>> Dstreams, like below:
>>>>>>> output = texts.zip(predictions)
>>>>>>>
>>>>>>>
>>>>>>> Thanks in advance.
>>>>>>>
>>>>>>> -Obaid
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to