Hi, Chiwan’s example is perfectly fine and it should also work with general EM algorithms. Moreover, it is the recommended way how to implement iterations with Flink. The iterateWithTermination API call generates a lazily evaluated data flow with an iteration operator. This plan will only be executed when you call env.execute, collect or count which depends on this data flow. In the example it would be triggered by result.print. You can also take a look at the KMeans implementation of Flink. It does not use a dynamic convergence criterion but it could easily be added.
If you really need to trigger the execution of the data flow for each iteration (e.g. because you have different data flows depending on the result), then you should persist the intermediate result every n iteration. Otherwise you will over and over re-trigger the execution of previous operators. Cheers, Till On Tue, Mar 29, 2016 at 1:26 AM, Dmitriy Lyubimov <dlie...@gmail.com> wrote: > Thanks Chiwan. > > I think this example still creates a lazy-evaluated plan. And if i need to > collect statistics to front end (and use it in subsequent iteration > evaluation) as my example with computing column-wise averages suggests? > > problem generally is, what if I need to eagerly evaluate the statistics > inside the iteration in order to proceed with further computations (and > even plan construction). typically, that would be result of M-step in EM > algorithm. > > On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <chiwanp...@apache.org> > wrote: > > > Hi Dmitriy, > > > > I think you can implement it with iterative API with custom convergence > > criterion. You can express the convergence criterion by two methods. One > is > > using a convergence criterion data set [1][2] and the other is > registering > > an aggregator with custom implementation of `ConvergenceCriterion` > > interface [3]. > > > > Here is an example using a convergence criterion data set in Scala API: > > > > ``` > > package flink.sample > > > > import org.apache.flink.api.scala._ > > > > import scala.util.Random > > > > object SampleApp extends App { > > val env = ExecutionEnvironment.getExecutionEnvironment > > > > val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10) > > > > val result = data.iterateWithTermination(5000) { prev => > > // calculate sub solution > > val rand = Random.nextDouble() > > val subSolution = prev.map(_ * rand) > > > > // calculate convergent condition > > val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ > 8) > > > > (subSolution, convergence) > > } > > > > result.print() > > } > > ``` > > > > Regards, > > Chiwan Park > > > > [1]: > > > https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29 > > [2]: iterateWithTermination method in > > > https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet > > [3]: > > > https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29 > > > > > On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dlie...@gmail.com> > wrote: > > > > > > Thank you, all :) > > > > > > yes, that's my question. How do we construct such a loop with a > concrete > > > example? > > > > > > Let's take something nonsensical yet specific. > > > > > > Say, in samsara terms we do something like that : > > > > > > var avg = Double.PositiveInfinity > > > var drmA = ... (construct elsewhere) > > > > > > > > > > > > do { > > > avg = drmA.colMeans.mean // average of col-wise means > > > drmA = drmA - avg // elementwise subtract of average > > > > > > } while (avg > 1e-10) > > > > > > (which probably does not converge in reality). > > > > > > How would we implement that with native iterations in flink? > > > > > > > > > > > > On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <trohrm...@apache.org> > > wrote: > > > > > >> Hi Dmitriy, > > >> > > >> I’m not sure whether I’ve understood your question correctly, so > please > > >> correct me if I’m wrong. > > >> > > >> So you’re asking whether it is a problem that > > >> > > >> stat1 = A.map.reduce > > >> A = A.update.map(stat1) > > >> > > >> are executed on the same input data set A and whether we have to > cache A > > >> for that, right? I assume you’re worried that A is calculated twice. > > >> > > >> Since you don’t have a API call which triggers eager execution of the > > data > > >> flow, the map.reduce and map(stat1) call will only construct the data > > flow > > >> of your program. Both operators will depend on the result of A which > is > > >> only once calculated (when execute, collect or count is called) and > then > > >> sent to the map.reduce and map(stat1) operator. > > >> > > >> However, it is not recommended using an explicit loop to do iterative > > >> computations with Flink. The problem here is that you will basically > > unroll > > >> the loop and construct a long pipeline with the operations of each > > >> iterations. Once you execute this long pipeline you will face > > considerable > > >> memory fragmentation, because every operator will get a proportional > > >> fraction of the available memory assigned. Even worse, if you trigger > > the > > >> execution of your data flow to evaluate the convergence criterion, you > > will > > >> execute for each iteration the complete pipeline which has been built > > up so > > >> far. Thus, you’ll end up with a quadratic complexity in the number of > > >> iterations. Therefore, I would highly recommend using Flink’s built in > > >> support for native iterations which won’t suffer from this problem or > to > > >> materialize at least for every n iterations the intermediate result. > At > > the > > >> moment this would mean to write the data to some sink and then reading > > it > > >> from there again. > > >> > > >> I hope this answers your question. If not, then don’t hesitate to ask > me > > >> again. > > >> > > >> Cheers, > > >> Till > > >> > > >> > > >> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis < > > >> theodoros.vasilou...@gmail.com> wrote: > > >> > > >>> Hello Dmitriy, > > >>> > > >>> If I understood correctly what you are basically talking about > > modifying > > >> a > > >>> DataSet as you iterate over it. > > >>> > > >>> AFAIK this is currently not possible in Flink, and indeed it's a real > > >>> bottleneck for ML algorithms. This is the reason our current > > >>> SGD implementation does a pass over the whole dataset at each > > iteration, > > >>> since we cannot take a sample from the dataset > > >>> and iterate only over that (so it's not really stochastic). > > >>> > > >>> The relevant JIRA is here: > > >>> https://issues.apache.org/jira/browse/FLINK-2396 > > >>> > > >>> I would love to start a discussion on how we can proceed to fix this. > > >>> > > >>> Regards, > > >>> Theodore > > >>> > > >>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlie...@gmail.com > > > > >>> wrote: > > >>> > > >>>> Hi, > > >>>> > > >>>> probably more of a question for Till: > > >>>> > > >>>> Imagine a common ML algorithm flow that runs until convergence. > > >>>> > > >>>> typical distributed flow would be something like that (e.g. GMM EM > > >> would > > >>> be > > >>>> exactly like that): > > >>>> > > >>>> A: input > > >>>> > > >>>> do { > > >>>> > > >>>> stat1 = A.map.reduce > > >>>> A = A.update-map(stat1) > > >>>> conv = A.map.reduce > > >>>> } until conv > convThreshold > > >>>> > > >>>> There probably could be 1 map-reduce step originating on A to > compute > > >>> both > > >>>> convergence criteria statistics and udpate statistics in one step. > not > > >>> the > > >>>> point. > > >>>> > > >>>> The point is that update and map.reduce originate on the same > dataset > > >>>> intermittently. > > >>>> > > >>>> In spark we would normally commit A to a object tree cache so that > > data > > >>> is > > >>>> available to subsequent map passes without any I/O or serialization > > >>>> operations, thus insuring high rate of iterations. > > >>>> > > >>>> We observe the same pattern pretty much everywhere. clustering, > > >>>> probabilistic algorithms, even batch gradient descent of quasi > newton > > >>>> algorithms fitting. > > >>>> > > >>>> How do we do something like that, for example, in FlinkML? > > >>>> > > >>>> Thoughts? > > >>>> > > >>>> thanks. > > >>>> > > >>>> -Dmitriy > > >>>> > > >>> > > >> > > > > >