All:

I'm trying to use SparseVectors with FlinkML 0.10.1.  It does not seem to
be working.  Here is a UnitTest that I created to recreate the problem:


*package* com.aol.ds.arc.ml.poc.flink


> *import* org.junit.After
> *import* org.junit.Before
> *import* org.slf4j.LoggerFactory
> *import* org.apache.flink.test.util.ForkableFlinkMiniCluster
> *import* scala.concurrent.duration.FiniteDuration
> *import* java.util.concurrent.TimeUnit
> *import* org.apache.flink.test.util.TestBaseUtils
> *import* org.apache.flink.runtime.StreamingMode
> *import* org.apache.flink.test.util.TestEnvironment
> *import* org.junit.Test
> *import* org.apache.flink.ml.common.LabeledVector
> *import* org.apache.flink.ml.math.SparseVector
> *import* org.apache.flink.api.scala._
> *import* org.apache.flink.ml.regression.MultipleLinearRegression
> *import* org.apache.flink.ml.math.DenseVector
> *class* FlinkMLRTest {
>   *var* Logger = LoggerFactory.getLogger(getClass.getName)
>   *var* cluster: Option[ForkableFlinkMiniCluster] = None
>   *val* parallelism = 4
>   *val* DEFAULT_AKKA_ASK_TIMEOUT = 1000
>   *val* DEFAULT_TIMEOUT = *new* FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT,
> TimeUnit.SECONDS)
>   @Before
>   *def* doBefore(): Unit = {
>     *val* cl = TestBaseUtils.startCluster(
>       1,
>       parallelism,
>       StreamingMode.BATCH_ONLY,
>       *false*,
>       *false*,
>       *true*)
>     *val* clusterEnvironment = *new* TestEnvironment(cl, parallelism)
>     clusterEnvironment.setAsContext()
>     cluster = Some(cl)
>   }
>   @After
>   *def* doAfter(): Unit = {
>     cluster.map(c => TestBaseUtils.stopCluster(c, DEFAULT_TIMEOUT))
>   }
>   @Test
>   *def* testMLR() {
>     *val* env = ExecutionEnvironment.getExecutionEnvironment
>     *val* training = Seq(
>       *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 2, 3),
> Array(1.0, 1.0, 1.0))),
>       *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 1, 5, 9),
> Array(1.0, 1.0, 1.0, 1.0))),
>       *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2), Array(
> 0.0, 1.0))),
>       *new* LabeledVector(0.0, *new* SparseVector(10, Array(0), Array(0.0
> ))),
>       *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2), Array(
> 0.0, 1.0))),
>       *new* LabeledVector(0.0, *new* SparseVector(10, Array(0), Array(0.0
> ))))
>     *val* testing = Seq(
>       *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 3), Array(
> 1.0, 1.0))),
>       *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2, 3),
> Array(0.0, 1.0, 1.0))),
>       *new* LabeledVector(0.0, *new* SparseVector(10, Array(0), Array(0.0
> ))))
>     *val* trainingDS = env.fromCollection(training)
>     *val* testingDS = env.fromCollection(testing)
>     trainingDS.print()
>     *val* mlr = MultipleLinearRegression()
>       .setIterations(100)
>       .setStepsize(2)
>       .setConvergenceThreshold(0.001)
>     mlr.fit(trainingDS)
>     *val* weights = mlr.weightsOption *match* {
>       *case* Some(weights) => { weights.collect() }
>       *case* None => *throw* *new* Exception("Could not calculate the
> weights.")
>     }
>     *if* (Logger.isInfoEnabled())
>       Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
>     testingDS.print()
>     *val* predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label
> )))
>     *if* (Logger.isInfoEnabled()) {
>       Logger.info(predictions.collect().mkString(","))
>     }
>   }
>   @Test
>   *def* testMLR_DenseVector() {
>     *val* env = ExecutionEnvironment.getExecutionEnvironment
>     *val* training = Seq(
>       *new* LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 1.0, 1.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
>       *new* LabeledVector(1.0, DenseVector(1.0, 0.0, 1.0, 0.0, 0.0, 0.0,
> 1.0, 0.0, 0.0, 0.0, 1.0)),
>       *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
>       *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
>       *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
>       *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)))
>     *val* testing = Seq(
>       *new* LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 0.0, 1.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
>       *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 1.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
>       *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)))
>     *val* trainingDS = env.fromCollection(training)
>     *val* testingDS = env.fromCollection(testing)
>     trainingDS.print()
>     *val* mlr = MultipleLinearRegression()
>       .setIterations(100)
>       .setStepsize(2)
>       .setConvergenceThreshold(0.001)
>     mlr.fit(trainingDS)
>     *val* weights = mlr.weightsOption *match* {
>       *case* Some(weights) => { weights.collect() }
>       *case* None => *throw* *new* Exception("Could not calculate the
> weights.")
>     }
>     *if* (Logger.isInfoEnabled())
>       Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
>     testingDS.print()
>     *val* predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label
> )))
>     *if* (Logger.isInfoEnabled()) {
>       Logger.info(s"**** PREDICTIONS: ${predictions.collect().mkString(","
> )}")
>     }
>   }
> }


It fails with this error:

java.lang.IllegalArgumentException: axpy only supports adding to a dense
> vector but got type class org.apache.flink.ml.math.SparseVector.

at org.apache.flink.ml.math.BLAS$.axpy(BLAS.scala:60)

at
> org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(
> GradientDescent.scala:181)

at
> org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(
> GradientDescent.scala:177)

at org.apache.flink.api.scala.DataSet$$anon$7.reduce(DataSet.scala:583)

at org.apache.flink.runtime.operators.AllReduceDriver.run(
> AllReduceDriver.java:132)

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)

at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> AbstractIterativeTask.java:144)

at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(
> IterationIntermediateTask.java:92)

at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

at java.lang.Thread.run(Thread.java:745)


If SparseVectors are not supported, when can we expect them to be supported
for MLR?

Thanks in advance for any information that you can provide.
-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* <http://www.aolplatforms.com>*

Reply via email to