All: I'm trying to use SparseVectors with FlinkML 0.10.1. It does not seem to be working. Here is a UnitTest that I created to recreate the problem:
*package* com.aol.ds.arc.ml.poc.flink > *import* org.junit.After > *import* org.junit.Before > *import* org.slf4j.LoggerFactory > *import* org.apache.flink.test.util.ForkableFlinkMiniCluster > *import* scala.concurrent.duration.FiniteDuration > *import* java.util.concurrent.TimeUnit > *import* org.apache.flink.test.util.TestBaseUtils > *import* org.apache.flink.runtime.StreamingMode > *import* org.apache.flink.test.util.TestEnvironment > *import* org.junit.Test > *import* org.apache.flink.ml.common.LabeledVector > *import* org.apache.flink.ml.math.SparseVector > *import* org.apache.flink.api.scala._ > *import* org.apache.flink.ml.regression.MultipleLinearRegression > *import* org.apache.flink.ml.math.DenseVector > *class* FlinkMLRTest { > *var* Logger = LoggerFactory.getLogger(getClass.getName) > *var* cluster: Option[ForkableFlinkMiniCluster] = None > *val* parallelism = 4 > *val* DEFAULT_AKKA_ASK_TIMEOUT = 1000 > *val* DEFAULT_TIMEOUT = *new* FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, > TimeUnit.SECONDS) > @Before > *def* doBefore(): Unit = { > *val* cl = TestBaseUtils.startCluster( > 1, > parallelism, > StreamingMode.BATCH_ONLY, > *false*, > *false*, > *true*) > *val* clusterEnvironment = *new* TestEnvironment(cl, parallelism) > clusterEnvironment.setAsContext() > cluster = Some(cl) > } > @After > *def* doAfter(): Unit = { > cluster.map(c => TestBaseUtils.stopCluster(c, DEFAULT_TIMEOUT)) > } > @Test > *def* testMLR() { > *val* env = ExecutionEnvironment.getExecutionEnvironment > *val* training = Seq( > *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 2, 3), > Array(1.0, 1.0, 1.0))), > *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 1, 5, 9), > Array(1.0, 1.0, 1.0, 1.0))), > *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2), Array( > 0.0, 1.0))), > *new* LabeledVector(0.0, *new* SparseVector(10, Array(0), Array(0.0 > ))), > *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2), Array( > 0.0, 1.0))), > *new* LabeledVector(0.0, *new* SparseVector(10, Array(0), Array(0.0 > )))) > *val* testing = Seq( > *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 3), Array( > 1.0, 1.0))), > *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2, 3), > Array(0.0, 1.0, 1.0))), > *new* LabeledVector(0.0, *new* SparseVector(10, Array(0), Array(0.0 > )))) > *val* trainingDS = env.fromCollection(training) > *val* testingDS = env.fromCollection(testing) > trainingDS.print() > *val* mlr = MultipleLinearRegression() > .setIterations(100) > .setStepsize(2) > .setConvergenceThreshold(0.001) > mlr.fit(trainingDS) > *val* weights = mlr.weightsOption *match* { > *case* Some(weights) => { weights.collect() } > *case* None => *throw* *new* Exception("Could not calculate the > weights.") > } > *if* (Logger.isInfoEnabled()) > Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}") > testingDS.print() > *val* predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label > ))) > *if* (Logger.isInfoEnabled()) { > Logger.info(predictions.collect().mkString(",")) > } > } > @Test > *def* testMLR_DenseVector() { > *val* env = ExecutionEnvironment.getExecutionEnvironment > *val* training = Seq( > *new* LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 1.0, 1.0, 0.0, > 0.0, 0.0, 0.0, 0.0, 0.0)), > *new* LabeledVector(1.0, DenseVector(1.0, 0.0, 1.0, 0.0, 0.0, 0.0, > 1.0, 0.0, 0.0, 0.0, 1.0)), > *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, > 0.0, 0.0, 0.0, 0.0, 0.0)), > *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, > 0.0, 0.0, 0.0, 0.0, 0.0)), > *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, > 0.0, 0.0, 0.0, 0.0, 0.0)), > *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, > 0.0, 0.0, 0.0, 0.0, 0.0))) > *val* testing = Seq( > *new* LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 0.0, 1.0, 0.0, > 0.0, 0.0, 0.0, 0.0, 0.0)), > *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, > 0.0, 0.0, 0.0, 0.0, 0.0)), > *new* LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, > 0.0, 0.0, 0.0, 0.0, 0.0))) > *val* trainingDS = env.fromCollection(training) > *val* testingDS = env.fromCollection(testing) > trainingDS.print() > *val* mlr = MultipleLinearRegression() > .setIterations(100) > .setStepsize(2) > .setConvergenceThreshold(0.001) > mlr.fit(trainingDS) > *val* weights = mlr.weightsOption *match* { > *case* Some(weights) => { weights.collect() } > *case* None => *throw* *new* Exception("Could not calculate the > weights.") > } > *if* (Logger.isInfoEnabled()) > Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}") > testingDS.print() > *val* predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label > ))) > *if* (Logger.isInfoEnabled()) { > Logger.info(s"**** PREDICTIONS: ${predictions.collect().mkString("," > )}") > } > } > } It fails with this error: java.lang.IllegalArgumentException: axpy only supports adding to a dense > vector but got type class org.apache.flink.ml.math.SparseVector. at org.apache.flink.ml.math.BLAS$.axpy(BLAS.scala:60) at > org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply( > GradientDescent.scala:181) at > org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply( > GradientDescent.scala:177) at org.apache.flink.api.scala.DataSet$$anon$7.reduce(DataSet.scala:583) at org.apache.flink.runtime.operators.AllReduceDriver.run( > AllReduceDriver.java:132) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489) at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run( > AbstractIterativeTask.java:144) at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run( > IterationIntermediateTask.java:92) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) If SparseVectors are not supported, when can we expect them to be supported for MLR? Thanks in advance for any information that you can provide. -- *Gna Phetsarath*System Architect // AOL Platforms // Data Services // Applied Research Chapter 770 Broadway, 5th Floor, New York, NY 10003 o: 212.402.4871 // m: 917.373.7363 vvmr: 8890237 aim: sphetsarath20 t: @sourigna * <http://www.aolplatforms.com>*