Hi Margus, thanks for reporting this, I’ve been able to reproduce and there
does indeed appear to be a bug. I’ve created a JIRA and have a fix ready, can
hopefully include in 1.3.1.
In the meantime, you can get the desired result using transform:
> model.trainOn(trainingData)
>
> testingData.transform { rdd =>
> val latest = model.latestModel()
> rdd.map(lp => (lp.label, latest.predict(lp.features)))
> }.print()
-------------------------
jeremyfreeman.net
@thefreemanlab
On Mar 15, 2015, at 2:56 PM, Margus Roo <[email protected]> wrote:
> Hi again
>
> Tried the same
> examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
> from 1.3.0
> and getting in case testing file content is:
> (0.0,[3.0,4.0,3.0])
> (0.0,[4.0,4.0,4.0])
> (4.0,[5.0,5.0,5.0])
> (5.0,[5.0,6.0,6.0])
> (6.0,[7.0,4.0,7.0])
> (7.0,[8.0,6.0,8.0])
> (8.0,[44.0,9.0,9.0])
> (9.0,[14.0,30.0,10.0])
>
> and the answer:
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (4.0,0.0)
> (5.0,0.0)
> (6.0,0.0)
> (7.0,0.0)
> (8.0,0.0)
> (9.0,0.0)
>
> What is wrong?
> I can see that model's weights are changing in case I put new data into
> training dir.
> Margus (margusja) Roo
> http://margus.roo.ee
> skype: margusja
> +372 51 480
> On 14/03/15 09:05, Margus Roo wrote:
>> Hi
>>
>> I try to understand example provided in
>> https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming
>> linear regression
>>
>> Code:
>> import org.apache.spark._
>> import org.apache.spark.streaming._
>> import org.apache.spark.mllib.linalg.Vectors
>> import org.apache.spark.mllib.regression.LabeledPoint
>> import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
>> import org.apache.spark.storage.StorageLevel
>> import org.apache.spark.streaming.dstream.DStream
>>
>> object StreamingLinReg {
>>
>> def main(args: Array[String]) {
>>
>> val conf = new
>> SparkConf().setAppName("StreamLinReg").setMaster("local[2]")
>> val ssc = new StreamingContext(conf, Seconds(10))
>>
>>
>> val trainingData =
>> ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/training/").map(LabeledPoint.parse).cache()
>>
>> val testData =
>> ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/testing/").map(LabeledPoint.parse)
>>
>> val numFeatures = 3
>> val model = new
>> StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
>>
>> model.trainOn(trainingData)
>> model.predictOnValues(testData.map(lp => (lp.label,
>> lp.features))).print()
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>> }
>>
>> }
>>
>> Compiled code and run it
>> Put file contains
>> (1.0,[2.0,2.0,2.0])
>> (2.0,[3.0,3.0,3.0])
>> (3.0,[4.0,4.0,4.0])
>> (4.0,[5.0,5.0,5.0])
>> (5.0,[6.0,6.0,6.0])
>> (6.0,[7.0,7.0,7.0])
>> (7.0,[8.0,8.0,8.0])
>> (8.0,[9.0,9.0,9.0])
>> (9.0,[10.0,10.0,10.0])
>> in to training directory.
>>
>> I can see that models weight change:
>> 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model:
>> weights, [7.333333333333333,7.333333333333333,7.333333333333333]
>>
>> No I can put what ever in to testing directory but I can not understand
>> answer.
>> In example I can put the same file I used for training in to testing
>> directory. File content is
>> (1.0,[2.0,2.0,2.0])
>> (2.0,[3.0,3.0,3.0])
>> (3.0,[4.0,4.0,4.0])
>> (4.0,[5.0,5.0,5.0])
>> (5.0,[6.0,6.0,6.0])
>> (6.0,[7.0,7.0,7.0])
>> (7.0,[8.0,8.0,8.0])
>> (8.0,[9.0,9.0,9.0])
>> (9.0,[10.0,10.0,10.0])
>>
>> And answer will be
>> (1.0,0.0)
>> (2.0,0.0)
>> (3.0,0.0)
>> (4.0,0.0)
>> (5.0,0.0)
>> (6.0,0.0)
>> (7.0,0.0)
>> (8.0,0.0)
>> (9.0,0.0)
>>
>> And in case my file content is
>> (0.0,[2.0,2.0,2.0])
>> (0.0,[3.0,3.0,3.0])
>> (0.0,[4.0,4.0,4.0])
>> (0.0,[5.0,5.0,5.0])
>> (0.0,[6.0,6.0,6.0])
>> (0.0,[7.0,7.0,7.0])
>> (0.0,[8.0,8.0,8.0])
>> (0.0,[9.0,9.0,9.0])
>> (0.0,[10.0,10.0,10.0])
>>
>> the answer will be:
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>>
>> I except to get label predicted by model.
>> --
>> Margus (margusja) Roo
>> http://margus.roo.ee
>> skype: margusja
>> +372 51 480
>