Thanks. Regardless of the rationale, i wanted to confirm if the iteration is lazily evaluated-only thing and it sounds eager evaluation inside (and collection) is not possible, and the algorithms that need it, just will have to work around this. I think this answers my question -- thanks!
-d On Tue, Mar 29, 2016 at 2:53 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi, > > Chiwan’s example is perfectly fine and it should also work with general EM > algorithms. Moreover, it is the recommended way how to implement iterations > with Flink. The iterateWithTermination API call generates a lazily > evaluated data flow with an iteration operator. This plan will only be > executed when you call env.execute, collect or count which depends on this > data flow. In the example it would be triggered by result.print. You can > also take a look at the KMeans implementation of Flink. It does not use a > dynamic convergence criterion but it could easily be added. > > If you really need to trigger the execution of the data flow for each > iteration (e.g. because you have different data flows depending on the > result), then you should persist the intermediate result every n iteration. > Otherwise you will over and over re-trigger the execution of previous > operators. > > Cheers, > Till > > > On Tue, Mar 29, 2016 at 1:26 AM, Dmitriy Lyubimov <dlie...@gmail.com> > wrote: > > > Thanks Chiwan. > > > > I think this example still creates a lazy-evaluated plan. And if i need > to > > collect statistics to front end (and use it in subsequent iteration > > evaluation) as my example with computing column-wise averages suggests? > > > > problem generally is, what if I need to eagerly evaluate the statistics > > inside the iteration in order to proceed with further computations (and > > even plan construction). typically, that would be result of M-step in EM > > algorithm. > > > > On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <chiwanp...@apache.org> > > wrote: > > > > > Hi Dmitriy, > > > > > > I think you can implement it with iterative API with custom convergence > > > criterion. You can express the convergence criterion by two methods. > One > > is > > > using a convergence criterion data set [1][2] and the other is > > registering > > > an aggregator with custom implementation of `ConvergenceCriterion` > > > interface [3]. > > > > > > Here is an example using a convergence criterion data set in Scala API: > > > > > > ``` > > > package flink.sample > > > > > > import org.apache.flink.api.scala._ > > > > > > import scala.util.Random > > > > > > object SampleApp extends App { > > > val env = ExecutionEnvironment.getExecutionEnvironment > > > > > > val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10) > > > > > > val result = data.iterateWithTermination(5000) { prev => > > > // calculate sub solution > > > val rand = Random.nextDouble() > > > val subSolution = prev.map(_ * rand) > > > > > > // calculate convergent condition > > > val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ > > 8) > > > > > > (subSolution, convergence) > > > } > > > > > > result.print() > > > } > > > ``` > > > > > > Regards, > > > Chiwan Park > > > > > > [1]: > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29 > > > [2]: iterateWithTermination method in > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet > > > [3]: > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29 > > > > > > > On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dlie...@gmail.com> > > wrote: > > > > > > > > Thank you, all :) > > > > > > > > yes, that's my question. How do we construct such a loop with a > > concrete > > > > example? > > > > > > > > Let's take something nonsensical yet specific. > > > > > > > > Say, in samsara terms we do something like that : > > > > > > > > var avg = Double.PositiveInfinity > > > > var drmA = ... (construct elsewhere) > > > > > > > > > > > > > > > > do { > > > > avg = drmA.colMeans.mean // average of col-wise means > > > > drmA = drmA - avg // elementwise subtract of average > > > > > > > > } while (avg > 1e-10) > > > > > > > > (which probably does not converge in reality). > > > > > > > > How would we implement that with native iterations in flink? > > > > > > > > > > > > > > > > On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <trohrm...@apache.org > > > > > wrote: > > > > > > > >> Hi Dmitriy, > > > >> > > > >> I’m not sure whether I’ve understood your question correctly, so > > please > > > >> correct me if I’m wrong. > > > >> > > > >> So you’re asking whether it is a problem that > > > >> > > > >> stat1 = A.map.reduce > > > >> A = A.update.map(stat1) > > > >> > > > >> are executed on the same input data set A and whether we have to > > cache A > > > >> for that, right? I assume you’re worried that A is calculated twice. > > > >> > > > >> Since you don’t have a API call which triggers eager execution of > the > > > data > > > >> flow, the map.reduce and map(stat1) call will only construct the > data > > > flow > > > >> of your program. Both operators will depend on the result of A which > > is > > > >> only once calculated (when execute, collect or count is called) and > > then > > > >> sent to the map.reduce and map(stat1) operator. > > > >> > > > >> However, it is not recommended using an explicit loop to do > iterative > > > >> computations with Flink. The problem here is that you will basically > > > unroll > > > >> the loop and construct a long pipeline with the operations of each > > > >> iterations. Once you execute this long pipeline you will face > > > considerable > > > >> memory fragmentation, because every operator will get a proportional > > > >> fraction of the available memory assigned. Even worse, if you > trigger > > > the > > > >> execution of your data flow to evaluate the convergence criterion, > you > > > will > > > >> execute for each iteration the complete pipeline which has been > built > > > up so > > > >> far. Thus, you’ll end up with a quadratic complexity in the number > of > > > >> iterations. Therefore, I would highly recommend using Flink’s built > in > > > >> support for native iterations which won’t suffer from this problem > or > > to > > > >> materialize at least for every n iterations the intermediate result. > > At > > > the > > > >> moment this would mean to write the data to some sink and then > reading > > > it > > > >> from there again. > > > >> > > > >> I hope this answers your question. If not, then don’t hesitate to > ask > > me > > > >> again. > > > >> > > > >> Cheers, > > > >> Till > > > >> > > > >> > > > >> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis < > > > >> theodoros.vasilou...@gmail.com> wrote: > > > >> > > > >>> Hello Dmitriy, > > > >>> > > > >>> If I understood correctly what you are basically talking about > > > modifying > > > >> a > > > >>> DataSet as you iterate over it. > > > >>> > > > >>> AFAIK this is currently not possible in Flink, and indeed it's a > real > > > >>> bottleneck for ML algorithms. This is the reason our current > > > >>> SGD implementation does a pass over the whole dataset at each > > > iteration, > > > >>> since we cannot take a sample from the dataset > > > >>> and iterate only over that (so it's not really stochastic). > > > >>> > > > >>> The relevant JIRA is here: > > > >>> https://issues.apache.org/jira/browse/FLINK-2396 > > > >>> > > > >>> I would love to start a discussion on how we can proceed to fix > this. > > > >>> > > > >>> Regards, > > > >>> Theodore > > > >>> > > > >>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov < > dlie...@gmail.com > > > > > > >>> wrote: > > > >>> > > > >>>> Hi, > > > >>>> > > > >>>> probably more of a question for Till: > > > >>>> > > > >>>> Imagine a common ML algorithm flow that runs until convergence. > > > >>>> > > > >>>> typical distributed flow would be something like that (e.g. GMM EM > > > >> would > > > >>> be > > > >>>> exactly like that): > > > >>>> > > > >>>> A: input > > > >>>> > > > >>>> do { > > > >>>> > > > >>>> stat1 = A.map.reduce > > > >>>> A = A.update-map(stat1) > > > >>>> conv = A.map.reduce > > > >>>> } until conv > convThreshold > > > >>>> > > > >>>> There probably could be 1 map-reduce step originating on A to > > compute > > > >>> both > > > >>>> convergence criteria statistics and udpate statistics in one step. > > not > > > >>> the > > > >>>> point. > > > >>>> > > > >>>> The point is that update and map.reduce originate on the same > > dataset > > > >>>> intermittently. > > > >>>> > > > >>>> In spark we would normally commit A to a object tree cache so that > > > data > > > >>> is > > > >>>> available to subsequent map passes without any I/O or > serialization > > > >>>> operations, thus insuring high rate of iterations. > > > >>>> > > > >>>> We observe the same pattern pretty much everywhere. clustering, > > > >>>> probabilistic algorithms, even batch gradient descent of quasi > > newton > > > >>>> algorithms fitting. > > > >>>> > > > >>>> How do we do something like that, for example, in FlinkML? > > > >>>> > > > >>>> Thoughts? > > > >>>> > > > >>>> thanks. > > > >>>> > > > >>>> -Dmitriy > > > >>>> > > > >>> > > > >> > > > > > > > > >