Thanks Chiwan.

I think this example still creates a lazy-evaluated plan. And if i need to
collect statistics to front end (and use it in subsequent iteration
evaluation) as my example with computing column-wise averages suggests?

problem generally is, what if I need to eagerly evaluate the statistics
inside the iteration in order to proceed with further computations (and
even plan construction). typically, that would be result of M-step in EM
algorithm.

On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <chiwanp...@apache.org> wrote:

> Hi Dmitriy,
>
> I think you can implement it with iterative API with custom convergence
> criterion. You can express the convergence criterion by two methods. One is
> using a convergence criterion data set [1][2] and the other is registering
> an aggregator with custom implementation of `ConvergenceCriterion`
> interface [3].
>
> Here is an example using a convergence criterion data set in Scala API:
>
> ```
> package flink.sample
>
> import org.apache.flink.api.scala._
>
> import scala.util.Random
>
> object SampleApp extends App {
>   val env = ExecutionEnvironment.getExecutionEnvironment
>
>   val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>
>   val result = data.iterateWithTermination(5000) { prev =>
>     // calculate sub solution
>     val rand = Random.nextDouble()
>     val subSolution = prev.map(_ * rand)
>
>     // calculate convergent condition
>     val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ > 8)
>
>     (subSolution, convergence)
>   }
>
>   result.print()
> }
> ```
>
> Regards,
> Chiwan Park
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
> [2]: iterateWithTermination method in
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
> [3]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29
>
> > On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dlie...@gmail.com> wrote:
> >
> > Thank you, all :)
> >
> > yes, that's my question. How do we construct such a loop with a concrete
> > example?
> >
> > Let's take something nonsensical yet specific.
> >
> > Say, in samsara terms we do something like that :
> >
> > var avg = Double.PositiveInfinity
> > var drmA = ... (construct elsewhere)
> >
> >
> >
> > do {
> >   avg = drmA.colMeans.mean // average of col-wise means
> >   drmA = drmA - avg // elementwise subtract of average
> >
> > } while (avg > 1e-10)
> >
> > (which probably does not converge in reality).
> >
> > How would we implement that with native iterations in flink?
> >
> >
> >
> > On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
> >
> >> Hi Dmitriy,
> >>
> >> I’m not sure whether I’ve understood your question correctly, so please
> >> correct me if I’m wrong.
> >>
> >> So you’re asking whether it is a problem that
> >>
> >> stat1 = A.map.reduce
> >> A = A.update.map(stat1)
> >>
> >> are executed on the same input data set A and whether we have to cache A
> >> for that, right? I assume you’re worried that A is calculated twice.
> >>
> >> Since you don’t have a API call which triggers eager execution of the
> data
> >> flow, the map.reduce and map(stat1) call will only construct the data
> flow
> >> of your program. Both operators will depend on the result of A which is
> >> only once calculated (when execute, collect or count is called) and then
> >> sent to the map.reduce and map(stat1) operator.
> >>
> >> However, it is not recommended using an explicit loop to do iterative
> >> computations with Flink. The problem here is that you will basically
> unroll
> >> the loop and construct a long pipeline with the operations of each
> >> iterations. Once you execute this long pipeline you will face
> considerable
> >> memory fragmentation, because every operator will get a proportional
> >> fraction of the available memory assigned. Even worse, if you trigger
> the
> >> execution of your data flow to evaluate the convergence criterion, you
> will
> >> execute for each iteration the complete pipeline which has been built
> up so
> >> far. Thus, you’ll end up with a quadratic complexity in the number of
> >> iterations. Therefore, I would highly recommend using Flink’s built in
> >> support for native iterations which won’t suffer from this problem or to
> >> materialize at least for every n iterations the intermediate result. At
> the
> >> moment this would mean to write the data to some sink and then reading
> it
> >> from there again.
> >>
> >> I hope this answers your question. If not, then don’t hesitate to ask me
> >> again.
> >>
> >> Cheers,
> >> Till
> >> ​
> >>
> >> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
> >> theodoros.vasilou...@gmail.com> wrote:
> >>
> >>> Hello Dmitriy,
> >>>
> >>> If I understood correctly what you are basically talking about
> modifying
> >> a
> >>> DataSet as you iterate over it.
> >>>
> >>> AFAIK this is currently not possible in Flink, and indeed it's a real
> >>> bottleneck for ML algorithms. This is the reason our current
> >>> SGD implementation does a pass over the whole dataset at each
> iteration,
> >>> since we cannot take a sample from the dataset
> >>> and iterate only over that (so it's not really stochastic).
> >>>
> >>> The relevant JIRA is here:
> >>> https://issues.apache.org/jira/browse/FLINK-2396
> >>>
> >>> I would love to start a discussion on how we can proceed to fix this.
> >>>
> >>> Regards,
> >>> Theodore
> >>>
> >>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlie...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> probably more of a question for Till:
> >>>>
> >>>> Imagine a common ML algorithm flow that runs until convergence.
> >>>>
> >>>> typical distributed flow would be something like that (e.g. GMM EM
> >> would
> >>> be
> >>>> exactly like that):
> >>>>
> >>>> A: input
> >>>>
> >>>> do {
> >>>>
> >>>>   stat1 = A.map.reduce
> >>>>   A = A.update-map(stat1)
> >>>>   conv = A.map.reduce
> >>>> } until conv > convThreshold
> >>>>
> >>>> There probably could be 1 map-reduce step originating on A to compute
> >>> both
> >>>> convergence criteria statistics and udpate statistics in one step. not
> >>> the
> >>>> point.
> >>>>
> >>>> The point is that update and map.reduce originate on the same dataset
> >>>> intermittently.
> >>>>
> >>>> In spark we would normally commit A to a object tree cache so that
> data
> >>> is
> >>>> available to subsequent map passes without any I/O or serialization
> >>>> operations, thus insuring high rate of iterations.
> >>>>
> >>>> We observe the same pattern pretty much everywhere. clustering,
> >>>> probabilistic algorithms, even batch gradient descent of quasi newton
> >>>> algorithms fitting.
> >>>>
> >>>> How do we do something like that, for example, in FlinkML?
> >>>>
> >>>> Thoughts?
> >>>>
> >>>> thanks.
> >>>>
> >>>> -Dmitriy
> >>>>
> >>>
> >>
>
>

Reply via email to