Thanks.

Regardless of the rationale, i wanted to confirm if the iteration is lazily
evaluated-only thing and it sounds eager evaluation inside (and collection)
is not possible, and the algorithms that need it, just will have to work
around this. I think this answers my question -- thanks!

-d

On Tue, Mar 29, 2016 at 2:53 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi,
>
> Chiwan’s example is perfectly fine and it should also work with general EM
> algorithms. Moreover, it is the recommended way how to implement iterations
> with Flink. The iterateWithTermination API call generates a lazily
> evaluated data flow with an iteration operator. This plan will only be
> executed when you call env.execute, collect or count which depends on this
> data flow. In the example it would be triggered by result.print. You can
> also take a look at the KMeans implementation of Flink. It does not use a
> dynamic convergence criterion but it could easily be added.
>
> If you really need to trigger the execution of the data flow for each
> iteration (e.g. because you have different data flows depending on the
> result), then you should persist the intermediate result every n iteration.
> Otherwise you will over and over re-trigger the execution of previous
> operators.
>
> Cheers,
> Till
> ​
>
> On Tue, Mar 29, 2016 at 1:26 AM, Dmitriy Lyubimov <dlie...@gmail.com>
> wrote:
>
> > Thanks Chiwan.
> >
> > I think this example still creates a lazy-evaluated plan. And if i need
> to
> > collect statistics to front end (and use it in subsequent iteration
> > evaluation) as my example with computing column-wise averages suggests?
> >
> > problem generally is, what if I need to eagerly evaluate the statistics
> > inside the iteration in order to proceed with further computations (and
> > even plan construction). typically, that would be result of M-step in EM
> > algorithm.
> >
> > On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <chiwanp...@apache.org>
> > wrote:
> >
> > > Hi Dmitriy,
> > >
> > > I think you can implement it with iterative API with custom convergence
> > > criterion. You can express the convergence criterion by two methods.
> One
> > is
> > > using a convergence criterion data set [1][2] and the other is
> > registering
> > > an aggregator with custom implementation of `ConvergenceCriterion`
> > > interface [3].
> > >
> > > Here is an example using a convergence criterion data set in Scala API:
> > >
> > > ```
> > > package flink.sample
> > >
> > > import org.apache.flink.api.scala._
> > >
> > > import scala.util.Random
> > >
> > > object SampleApp extends App {
> > >   val env = ExecutionEnvironment.getExecutionEnvironment
> > >
> > >   val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
> > >
> > >   val result = data.iterateWithTermination(5000) { prev =>
> > >     // calculate sub solution
> > >     val rand = Random.nextDouble()
> > >     val subSolution = prev.map(_ * rand)
> > >
> > >     // calculate convergent condition
> > >     val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ >
> 8)
> > >
> > >     (subSolution, convergence)
> > >   }
> > >
> > >   result.print()
> > > }
> > > ```
> > >
> > > Regards,
> > > Chiwan Park
> > >
> > > [1]:
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
> > > [2]: iterateWithTermination method in
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
> > > [3]:
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29
> > >
> > > > On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dlie...@gmail.com>
> > wrote:
> > > >
> > > > Thank you, all :)
> > > >
> > > > yes, that's my question. How do we construct such a loop with a
> > concrete
> > > > example?
> > > >
> > > > Let's take something nonsensical yet specific.
> > > >
> > > > Say, in samsara terms we do something like that :
> > > >
> > > > var avg = Double.PositiveInfinity
> > > > var drmA = ... (construct elsewhere)
> > > >
> > > >
> > > >
> > > > do {
> > > >   avg = drmA.colMeans.mean // average of col-wise means
> > > >   drmA = drmA - avg // elementwise subtract of average
> > > >
> > > > } while (avg > 1e-10)
> > > >
> > > > (which probably does not converge in reality).
> > > >
> > > > How would we implement that with native iterations in flink?
> > > >
> > > >
> > > >
> > > > On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <trohrm...@apache.org
> >
> > > wrote:
> > > >
> > > >> Hi Dmitriy,
> > > >>
> > > >> I’m not sure whether I’ve understood your question correctly, so
> > please
> > > >> correct me if I’m wrong.
> > > >>
> > > >> So you’re asking whether it is a problem that
> > > >>
> > > >> stat1 = A.map.reduce
> > > >> A = A.update.map(stat1)
> > > >>
> > > >> are executed on the same input data set A and whether we have to
> > cache A
> > > >> for that, right? I assume you’re worried that A is calculated twice.
> > > >>
> > > >> Since you don’t have a API call which triggers eager execution of
> the
> > > data
> > > >> flow, the map.reduce and map(stat1) call will only construct the
> data
> > > flow
> > > >> of your program. Both operators will depend on the result of A which
> > is
> > > >> only once calculated (when execute, collect or count is called) and
> > then
> > > >> sent to the map.reduce and map(stat1) operator.
> > > >>
> > > >> However, it is not recommended using an explicit loop to do
> iterative
> > > >> computations with Flink. The problem here is that you will basically
> > > unroll
> > > >> the loop and construct a long pipeline with the operations of each
> > > >> iterations. Once you execute this long pipeline you will face
> > > considerable
> > > >> memory fragmentation, because every operator will get a proportional
> > > >> fraction of the available memory assigned. Even worse, if you
> trigger
> > > the
> > > >> execution of your data flow to evaluate the convergence criterion,
> you
> > > will
> > > >> execute for each iteration the complete pipeline which has been
> built
> > > up so
> > > >> far. Thus, you’ll end up with a quadratic complexity in the number
> of
> > > >> iterations. Therefore, I would highly recommend using Flink’s built
> in
> > > >> support for native iterations which won’t suffer from this problem
> or
> > to
> > > >> materialize at least for every n iterations the intermediate result.
> > At
> > > the
> > > >> moment this would mean to write the data to some sink and then
> reading
> > > it
> > > >> from there again.
> > > >>
> > > >> I hope this answers your question. If not, then don’t hesitate to
> ask
> > me
> > > >> again.
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >> ​
> > > >>
> > > >> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
> > > >> theodoros.vasilou...@gmail.com> wrote:
> > > >>
> > > >>> Hello Dmitriy,
> > > >>>
> > > >>> If I understood correctly what you are basically talking about
> > > modifying
> > > >> a
> > > >>> DataSet as you iterate over it.
> > > >>>
> > > >>> AFAIK this is currently not possible in Flink, and indeed it's a
> real
> > > >>> bottleneck for ML algorithms. This is the reason our current
> > > >>> SGD implementation does a pass over the whole dataset at each
> > > iteration,
> > > >>> since we cannot take a sample from the dataset
> > > >>> and iterate only over that (so it's not really stochastic).
> > > >>>
> > > >>> The relevant JIRA is here:
> > > >>> https://issues.apache.org/jira/browse/FLINK-2396
> > > >>>
> > > >>> I would love to start a discussion on how we can proceed to fix
> this.
> > > >>>
> > > >>> Regards,
> > > >>> Theodore
> > > >>>
> > > >>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <
> dlie...@gmail.com
> > >
> > > >>> wrote:
> > > >>>
> > > >>>> Hi,
> > > >>>>
> > > >>>> probably more of a question for Till:
> > > >>>>
> > > >>>> Imagine a common ML algorithm flow that runs until convergence.
> > > >>>>
> > > >>>> typical distributed flow would be something like that (e.g. GMM EM
> > > >> would
> > > >>> be
> > > >>>> exactly like that):
> > > >>>>
> > > >>>> A: input
> > > >>>>
> > > >>>> do {
> > > >>>>
> > > >>>>   stat1 = A.map.reduce
> > > >>>>   A = A.update-map(stat1)
> > > >>>>   conv = A.map.reduce
> > > >>>> } until conv > convThreshold
> > > >>>>
> > > >>>> There probably could be 1 map-reduce step originating on A to
> > compute
> > > >>> both
> > > >>>> convergence criteria statistics and udpate statistics in one step.
> > not
> > > >>> the
> > > >>>> point.
> > > >>>>
> > > >>>> The point is that update and map.reduce originate on the same
> > dataset
> > > >>>> intermittently.
> > > >>>>
> > > >>>> In spark we would normally commit A to a object tree cache so that
> > > data
> > > >>> is
> > > >>>> available to subsequent map passes without any I/O or
> serialization
> > > >>>> operations, thus insuring high rate of iterations.
> > > >>>>
> > > >>>> We observe the same pattern pretty much everywhere. clustering,
> > > >>>> probabilistic algorithms, even batch gradient descent of quasi
> > newton
> > > >>>> algorithms fitting.
> > > >>>>
> > > >>>> How do we do something like that, for example, in FlinkML?
> > > >>>>
> > > >>>> Thoughts?
> > > >>>>
> > > >>>> thanks.
> > > >>>>
> > > >>>> -Dmitriy
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Reply via email to