Hi Gna,

Thanks for reporting the problem. Because level 1 operation in FlinkML BLAS 
library doesn’t support SparseVector, SparseVector is not supported currently. 
I’ve filed this to JIRA [1].

Maybe I can send a patch to solve this in few days.

[1]: https://issues.apache.org/jira/browse/FLINK-3330

Regards,
Chiwan Park

> On Feb 4, 2016, at 5:39 AM, Sourigna Phetsarath <gna.phetsar...@teamaol.com> 
> wrote:
> 
> All:
> 
> I'm trying to use SparseVectors with FlinkML 0.10.1.  It does not seem to be 
> working.  Here is a UnitTest that I created to recreate the problem:
> 
> 
> package com.aol.ds.arc.ml.poc.flink 
> 
> import org.junit.After
> import org.junit.Before
> import org.slf4j.LoggerFactory
> import org.apache.flink.test.util.ForkableFlinkMiniCluster
> import scala.concurrent.duration.FiniteDuration
> import java.util.concurrent.TimeUnit
> import org.apache.flink.test.util.TestBaseUtils
> import org.apache.flink.runtime.StreamingMode
> import org.apache.flink.test.util.TestEnvironment
> import org.junit.Test
> import org.apache.flink.ml.common.LabeledVector
> import org.apache.flink.ml.math.SparseVector
> import org.apache.flink.api.scala._
> import org.apache.flink.ml.regression.MultipleLinearRegression
> import org.apache.flink.ml.math.DenseVector
> class FlinkMLRTest {
>   var Logger = LoggerFactory.getLogger(getClass.getName)
>   var cluster: Option[ForkableFlinkMiniCluster] = None
>   val parallelism = 4
>   val DEFAULT_AKKA_ASK_TIMEOUT = 1000
>   val DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, 
> TimeUnit.SECONDS)
>   @Before
>   def doBefore(): Unit = {
>     val cl = TestBaseUtils.startCluster(
>       1,
>       parallelism,
>       StreamingMode.BATCH_ONLY,
>       false,
>       false,
>       true)
>     val clusterEnvironment = new TestEnvironment(cl, parallelism)
>     clusterEnvironment.setAsContext()
>     cluster = Some(cl)
>   }
>   @After
>   def doAfter(): Unit = {
>     cluster.map(c => TestBaseUtils.stopCluster(c, DEFAULT_TIMEOUT))
>   }
>   @Test
>   def testMLR() {
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val training = Seq(
>       new LabeledVector(1.0, new SparseVector(10, Array(0, 2, 3), Array(1.0, 
> 1.0, 1.0))),
>       new LabeledVector(1.0, new SparseVector(10, Array(0, 1, 5, 9), 
> Array(1.0, 1.0, 1.0, 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 
> 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 
> 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))))
>     val testing = Seq(
>       new LabeledVector(1.0, new SparseVector(10, Array(0, 3), Array(1.0, 
> 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0, 2, 3), Array(0.0, 
> 1.0, 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))))
>     val trainingDS = env.fromCollection(training)
>     val testingDS = env.fromCollection(testing)
>     trainingDS.print()
>     val mlr = MultipleLinearRegression()
>       .setIterations(100)
>       .setStepsize(2)
>       .setConvergenceThreshold(0.001)
>     mlr.fit(trainingDS)
>     val weights = mlr.weightsOption match {
>       case Some(weights) => { weights.collect() }
>       case None => throw new Exception("Could not calculate the weights.")
>     }
>     if (Logger.isInfoEnabled())
>       Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
>     testingDS.print()
>     val predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label)))
>     if (Logger.isInfoEnabled()) {
>       Logger.info(predictions.collect().mkString(","))
>     }
>   }
>   @Test
>   def testMLR_DenseVector() {
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val training = Seq(
>       new LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 
> 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(1.0, DenseVector(1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 
> 0.0, 0.0, 0.0, 1.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 
> 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 
> 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 
> 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 
> 0.0, 0.0, 0.0, 0.0)))
>     val testing = Seq(
>       new LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 
> 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 
> 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 
> 0.0, 0.0, 0.0, 0.0)))
>     val trainingDS = env.fromCollection(training)
>     val testingDS = env.fromCollection(testing)
>     trainingDS.print()
>     val mlr = MultipleLinearRegression()
>       .setIterations(100)
>       .setStepsize(2)
>       .setConvergenceThreshold(0.001)
>     mlr.fit(trainingDS)
>     val weights = mlr.weightsOption match {
>       case Some(weights) => { weights.collect() }
>       case None => throw new Exception("Could not calculate the weights.")
>     }
>     if (Logger.isInfoEnabled())
>       Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
>     testingDS.print()
>     val predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label)))
>     if (Logger.isInfoEnabled()) {
>       Logger.info(s"**** PREDICTIONS: ${predictions.collect().mkString(",")}")
>     }
>   }
> }
> 
> It fails with this error:
> 
> java.lang.IllegalArgumentException: axpy only supports adding to a dense 
> vector but got type class org.apache.flink.ml.math.SparseVector.
> at org.apache.flink.ml.math.BLAS$.axpy(BLAS.scala:60)
> at 
> org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(GradientDescent.scala:181)
> at 
> org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(GradientDescent.scala:177)
> at org.apache.flink.api.scala.DataSet$$anon$7.reduce(DataSet.scala:583)
> at 
> org.apache.flink.runtime.operators.AllReduceDriver.run(AllReduceDriver.java:132)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
> at 
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:144)
> at 
> org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> 
> If SparseVectors are not supported, when can we expect them to be supported 
> for MLR?
> 
> Thanks in advance for any information that you can provide.
> -- 
> Gna Phetsarath
> System Architect // AOL Platforms // Data Services // Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
> 
> 
> 

Reply via email to