Hi,
The model weight is not updating for streaming linear regression. The code and
data below is what I am running.
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setMaster("local[1]").setAppName("1feature")
val ssc = new StreamingContext(conf, Seconds(25))
val trainingData =
ssc.textFileStream("file:///data/TrainStreamDir").map(LabeledPoint.parse)
val testData =
ssc.textFileStream("file:///data/TestStreamDir").map(LabeledPoint.parse)
val numFeatures = 3
val model = new
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
sample Data in the TrainStreamDir:
(10240,[1,21,0])
(9936,[2,21,15])
(10118,[3,21,30])
(10174,[4,21,45])
(10460,[5,22,0])
(9961,[6,22,15])
(10372,[7,22,30])
(10666,[8,22,45])
(10300,[9,23,0])
Sample of output results:
14/11/10 15:52:55 INFO scheduler.JobScheduler: Added jobs for time
1415652775000 ms
14/11/10 15:52:55 INFO scheduler.JobScheduler: Starting job streaming job
1415652775000 ms.0 from job set of time 141565
2775000 ms
14/11/10 15:52:55 INFO spark.SparkContext: Starting job: count at
GradientDescent.scala:162
14/11/10 15:52:55 INFO spark.SparkContext: Job finished: count at
GradientDescent.scala:162, took 3.1689E-5 s
14/11/10 15:52:55 INFO optimization.GradientDescent:
GradientDescent.runMiniBatchSGD returning initial weights, no data
found
14/11/10 15:52:55 INFO regression.StreamingLinearRegressionWithSGD: Model
updated at time 1415652775000 ms
14/11/10 15:52:55 INFO regression.StreamingLinearRegressionWithSGD: Current
model: weights, [0.0,0.0,0.0]
Thanks
Tri