Computing will be triggered by new files added in the directory. If you place new files to the directory and it will start training the model.
2014-11-11 5:03 GMT+08:00 Bui, Tri <tri....@verizonwireless.com.invalid>: > Hi, > > > > The model weight is not updating for streaming linear regression. The > code and data below is what I am running. > > > > import org.apache.spark.mllib.linalg.Vectors > > import org.apache.spark.mllib.regression.LabeledPoint > > import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD > > import org.apache.spark._ > > import org.apache.spark.streaming._ > > import org.apache.spark.streaming.StreamingContext._ > > > > val conf = new SparkConf().setMaster("local[1]").setAppName("1feature") > > val ssc = new StreamingContext(conf, Seconds(25)) > > val trainingData = > ssc.textFileStream("file:///data/TrainStreamDir").map(LabeledPoint.parse) > > val testData = > ssc.textFileStream("file:///data/TestStreamDir").map(LabeledPoint.parse) > > val numFeatures = 3 > > val model = new > StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)) > > model.trainOn(trainingData) > > model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() > > ssc.start() > > ssc.awaitTermination() > > > > *sample Data in the TrainStreamDir:* > > > > (10240,[1,21,0]) > > (9936,[2,21,15]) > > (10118,[3,21,30]) > > (10174,[4,21,45]) > > (10460,[5,22,0]) > > (9961,[6,22,15]) > > (10372,[7,22,30]) > > (10666,[8,22,45]) > > (10300,[9,23,0]) > > > > *Sample of output results:* > > 14/11/10 15:52:55 INFO scheduler.JobScheduler: Added jobs for time > 1415652775000 ms > > 14/11/10 15:52:55 INFO scheduler.JobScheduler: Starting job streaming job > 1415652775000 ms.0 from job set of time 141565 > > 2775000 ms > > 14/11/10 15:52:55 INFO spark.SparkContext: Starting job: count at > GradientDescent.scala:162 > > 14/11/10 15:52:55 INFO spark.SparkContext: Job finished: count at > GradientDescent.scala:162, took 3.1689E-5 s > > 14/11/10 15:52:55 INFO optimization.GradientDescent: > GradientDescent.runMiniBatchSGD returning initial weights, no data > > found > > 14/11/10 15:52:55 INFO regression.StreamingLinearRegressionWithSGD: Model > updated at time 1415652775000 ms > > 14/11/10 15:52:55 INFO regression.StreamingLinearRegressionWithSGD: > Current model: weights, [0.0,0.0,0.0] > > > > Thanks > > Tri > > >