Hi Sourigna,

it turned out to be a bug in the GradientDescent implementation which
cannot handle sparse gradients. That is not so problematic by itself,
because the sum of gradient vectors is usually dense even if the individual
gradient vectors are sparse. We simply forgot to initialize the initial
vector of the reduce operation to be dense. I’ve created a PR [1] which
should fix the problem. After reviewing it, it should be merged in the next
days.

[1] https://github.com/apache/flink/pull/1587

Cheers,
Till
​

On Thu, Feb 4, 2016 at 5:09 AM, Chiwan Park <chiwanp...@apache.org> wrote:

> Hi Gna,
>
> Thanks for reporting the problem. Because level 1 operation in FlinkML
> BLAS library doesn’t support SparseVector, SparseVector is not supported
> currently. I’ve filed this to JIRA [1].
>
> Maybe I can send a patch to solve this in few days.
>
> [1]: https://issues.apache.org/jira/browse/FLINK-3330
>
> Regards,
> Chiwan Park
>
> > On Feb 4, 2016, at 5:39 AM, Sourigna Phetsarath <
> gna.phetsar...@teamaol.com> wrote:
> >
> > All:
> >
> > I'm trying to use SparseVectors with FlinkML 0.10.1.  It does not seem
> to be working.  Here is a UnitTest that I created to recreate the problem:
> >
> >
> > package com.aol.ds.arc.ml.poc.flink
> >
> > import org.junit.After
> > import org.junit.Before
> > import org.slf4j.LoggerFactory
> > import org.apache.flink.test.util.ForkableFlinkMiniCluster
> > import scala.concurrent.duration.FiniteDuration
> > import java.util.concurrent.TimeUnit
> > import org.apache.flink.test.util.TestBaseUtils
> > import org.apache.flink.runtime.StreamingMode
> > import org.apache.flink.test.util.TestEnvironment
> > import org.junit.Test
> > import org.apache.flink.ml.common.LabeledVector
> > import org.apache.flink.ml.math.SparseVector
> > import org.apache.flink.api.scala._
> > import org.apache.flink.ml.regression.MultipleLinearRegression
> > import org.apache.flink.ml.math.DenseVector
> > class FlinkMLRTest {
> >   var Logger = LoggerFactory.getLogger(getClass.getName)
> >   var cluster: Option[ForkableFlinkMiniCluster] = None
> >   val parallelism = 4
> >   val DEFAULT_AKKA_ASK_TIMEOUT = 1000
> >   val DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT,
> TimeUnit.SECONDS)
> >   @Before
> >   def doBefore(): Unit = {
> >     val cl = TestBaseUtils.startCluster(
> >       1,
> >       parallelism,
> >       StreamingMode.BATCH_ONLY,
> >       false,
> >       false,
> >       true)
> >     val clusterEnvironment = new TestEnvironment(cl, parallelism)
> >     clusterEnvironment.setAsContext()
> >     cluster = Some(cl)
> >   }
> >   @After
> >   def doAfter(): Unit = {
> >     cluster.map(c => TestBaseUtils.stopCluster(c, DEFAULT_TIMEOUT))
> >   }
> >   @Test
> >   def testMLR() {
> >     val env = ExecutionEnvironment.getExecutionEnvironment
> >     val training = Seq(
> >       new LabeledVector(1.0, new SparseVector(10, Array(0, 2, 3),
> Array(1.0, 1.0, 1.0))),
> >       new LabeledVector(1.0, new SparseVector(10, Array(0, 1, 5, 9),
> Array(1.0, 1.0, 1.0, 1.0))),
> >       new LabeledVector(0.0, new SparseVector(10, Array(0, 2),
> Array(0.0, 1.0))),
> >       new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))),
> >       new LabeledVector(0.0, new SparseVector(10, Array(0, 2),
> Array(0.0, 1.0))),
> >       new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))))
> >     val testing = Seq(
> >       new LabeledVector(1.0, new SparseVector(10, Array(0, 3),
> Array(1.0, 1.0))),
> >       new LabeledVector(0.0, new SparseVector(10, Array(0, 2, 3),
> Array(0.0, 1.0, 1.0))),
> >       new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))))
> >     val trainingDS = env.fromCollection(training)
> >     val testingDS = env.fromCollection(testing)
> >     trainingDS.print()
> >     val mlr = MultipleLinearRegression()
> >       .setIterations(100)
> >       .setStepsize(2)
> >       .setConvergenceThreshold(0.001)
> >     mlr.fit(trainingDS)
> >     val weights = mlr.weightsOption match {
> >       case Some(weights) => { weights.collect() }
> >       case None => throw new Exception("Could not calculate the
> weights.")
> >     }
> >     if (Logger.isInfoEnabled())
> >       Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
> >     testingDS.print()
> >     val predictions = mlr.evaluate(testingDS.map(x => (x.vector,
> x.label)))
> >     if (Logger.isInfoEnabled()) {
> >       Logger.info(predictions.collect().mkString(","))
> >     }
> >   }
> >   @Test
> >   def testMLR_DenseVector() {
> >     val env = ExecutionEnvironment.getExecutionEnvironment
> >     val training = Seq(
> >       new LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 1.0, 1.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
> >       new LabeledVector(1.0, DenseVector(1.0, 0.0, 1.0, 0.0, 0.0, 0.0,
> 1.0, 0.0, 0.0, 0.0, 1.0)),
> >       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
> >       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
> >       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
> >       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)))
> >     val testing = Seq(
> >       new LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 0.0, 1.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
> >       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 1.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)),
> >       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0)))
> >     val trainingDS = env.fromCollection(training)
> >     val testingDS = env.fromCollection(testing)
> >     trainingDS.print()
> >     val mlr = MultipleLinearRegression()
> >       .setIterations(100)
> >       .setStepsize(2)
> >       .setConvergenceThreshold(0.001)
> >     mlr.fit(trainingDS)
> >     val weights = mlr.weightsOption match {
> >       case Some(weights) => { weights.collect() }
> >       case None => throw new Exception("Could not calculate the
> weights.")
> >     }
> >     if (Logger.isInfoEnabled())
> >       Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
> >     testingDS.print()
> >     val predictions = mlr.evaluate(testingDS.map(x => (x.vector,
> x.label)))
> >     if (Logger.isInfoEnabled()) {
> >       Logger.info(s"**** PREDICTIONS:
> ${predictions.collect().mkString(",")}")
> >     }
> >   }
> > }
> >
> > It fails with this error:
> >
> > java.lang.IllegalArgumentException: axpy only supports adding to a dense
> vector but got type class org.apache.flink.ml.math.SparseVector.
> > at org.apache.flink.ml.math.BLAS$.axpy(BLAS.scala:60)
> > at
> org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(GradientDescent.scala:181)
> > at
> org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(GradientDescent.scala:177)
> > at org.apache.flink.api.scala.DataSet$$anon$7.reduce(DataSet.scala:583)
> > at
> org.apache.flink.runtime.operators.AllReduceDriver.run(AllReduceDriver.java:132)
> > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
> > at
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:144)
> > at
> org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
> > at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > If SparseVectors are not supported, when can we expect them to be
> supported for MLR?
> >
> > Thanks in advance for any information that you can provide.
> > --
> > Gna Phetsarath
> > System Architect // AOL Platforms // Data Services // Applied Research
> Chapter
> > 770 Broadway, 5th Floor, New York, NY 10003
> > o: 212.402.4871 // m: 917.373.7363
> > vvmr: 8890237 aim: sphetsarath20 t: @sourigna
> >
> >
> >
>
>

Reply via email to