Regarding the first question, can you say more about how you are loading your 
data? And what is the size of the data set? And is that the only error you see, 
and do you only see it in the streaming version?

For the second question, there are a couple reasons the weights might slightly 
differ, it depends on exactly how you set up the comparison. When you split it 
into 5, were those the same 5 chunks of data you used for the streaming case? 
And were they presented to the optimizer in the same order? Difference in 
either could produce small differences in the resulting weights, but that 
doesn’t mean it’s doing anything wrong.

-------------------------
jeremyfreeman.net
@thefreemanlab

On Mar 17, 2015, at 6:19 PM, EcoMotto Inc. <ecomot...@gmail.com> wrote:

> Hello Jeremy,
> 
> Thank you for your reply.
> 
> When I am running this code on the local machine on a streaming data, it 
> keeps giving me this error:
> WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost): 
> java.io.FileNotFoundException: 
> /tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file or 
> directory) 
> 
> And when I execute the same code on a static data after randomly splitting it 
> into 5 sets, it gives me a little bit different weights (difference is in 
> decimals). I am still trying to analyse why would this be happening.
> Any inputs, on why would this be happening?
> 
> Best Regards,
> Arunkumar
> 
> 
> On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman <freeman.jer...@gmail.com> 
> wrote:
> Hi Arunkumar,
> 
> That looks like it should work. Logically, it’s similar to the implementation 
> used by StreamingLinearRegression and StreamingLogisticRegression, see this 
> class:
> 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
> 
> which exposes the kind of operation your describing (for any linear method).
> 
> The nice thing about the gradient-based methods is that they can use existing 
> MLLib optimization routines in this fairly direct way. Other methods (such as 
> KMeans) require a bit more reengineering.
> 
> — Jeremy
> 
> -------------------------
> jeremyfreeman.net
> @thefreemanlab
> 
> On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. <ecomot...@gmail.com> wrote:
> 
>> Hello,
>> 
>> I am new to spark streaming API.
>> 
>> I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on 
>> streaming data? Currently I am using forecahRDD for parsing through DStream 
>> and I am generating a model based on each RDD. Am I doing anything logically 
>> wrong here?
>> Thank you.
>> 
>> Sample Code:
>> val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater())
>> var initialWeights = 
>> Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
>> var isFirst = true
>> var model = new LinearRegressionModel(null,1.0)
>> 
>> parsedData.foreachRDD{rdd =>
>>   if(isFirst) {
>>     val weights = algorithm.optimize(rdd, initialWeights)
>>     val w = weights.toArray
>>     val intercept = w.head
>>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>>     isFirst = false
>>   }else{
>>     var ab = ArrayBuffer[Double]()
>>     ab.insert(0, model.intercept)
>>     ab.appendAll( model.weights.toArray)
>>     print("Intercept = "+model.intercept+" :: modelWeights = "+model.weights)
>>     initialWeights = Vectors.dense(ab.toArray)
>>     print("Initial Weights: "+ initialWeights)
>>     val weights = algorithm.optimize(rdd, initialWeights)
>>     val w = weights.toArray
>>     val intercept = w.head
>>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>>   }
>> 
>> 
>> Best Regards,
>> Arunkumar
> 
> 

Reply via email to