Hi Dmitriy, I think you can implement it with iterative API with custom convergence criterion. You can express the convergence criterion by two methods. One is using a convergence criterion data set [1][2] and the other is registering an aggregator with custom implementation of `ConvergenceCriterion` interface [3].
Here is an example using a convergence criterion data set in Scala API: ``` package flink.sample import org.apache.flink.api.scala._ import scala.util.Random object SampleApp extends App { val env = ExecutionEnvironment.getExecutionEnvironment val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val result = data.iterateWithTermination(5000) { prev => // calculate sub solution val rand = Random.nextDouble() val subSolution = prev.map(_ * rand) // calculate convergent condition val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ > 8) (subSolution, convergence) } result.print() } ``` Regards, Chiwan Park [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29 [2]: iterateWithTermination method in https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet [3]: https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29 > On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dlie...@gmail.com> wrote: > > Thank you, all :) > > yes, that's my question. How do we construct such a loop with a concrete > example? > > Let's take something nonsensical yet specific. > > Say, in samsara terms we do something like that : > > var avg = Double.PositiveInfinity > var drmA = ... (construct elsewhere) > > > > do { > avg = drmA.colMeans.mean // average of col-wise means > drmA = drmA - avg // elementwise subtract of average > > } while (avg > 1e-10) > > (which probably does not converge in reality). > > How would we implement that with native iterations in flink? > > > > On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <trohrm...@apache.org> wrote: > >> Hi Dmitriy, >> >> I’m not sure whether I’ve understood your question correctly, so please >> correct me if I’m wrong. >> >> So you’re asking whether it is a problem that >> >> stat1 = A.map.reduce >> A = A.update.map(stat1) >> >> are executed on the same input data set A and whether we have to cache A >> for that, right? I assume you’re worried that A is calculated twice. >> >> Since you don’t have a API call which triggers eager execution of the data >> flow, the map.reduce and map(stat1) call will only construct the data flow >> of your program. Both operators will depend on the result of A which is >> only once calculated (when execute, collect or count is called) and then >> sent to the map.reduce and map(stat1) operator. >> >> However, it is not recommended using an explicit loop to do iterative >> computations with Flink. The problem here is that you will basically unroll >> the loop and construct a long pipeline with the operations of each >> iterations. Once you execute this long pipeline you will face considerable >> memory fragmentation, because every operator will get a proportional >> fraction of the available memory assigned. Even worse, if you trigger the >> execution of your data flow to evaluate the convergence criterion, you will >> execute for each iteration the complete pipeline which has been built up so >> far. Thus, you’ll end up with a quadratic complexity in the number of >> iterations. Therefore, I would highly recommend using Flink’s built in >> support for native iterations which won’t suffer from this problem or to >> materialize at least for every n iterations the intermediate result. At the >> moment this would mean to write the data to some sink and then reading it >> from there again. >> >> I hope this answers your question. If not, then don’t hesitate to ask me >> again. >> >> Cheers, >> Till >> >> >> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis < >> theodoros.vasilou...@gmail.com> wrote: >> >>> Hello Dmitriy, >>> >>> If I understood correctly what you are basically talking about modifying >> a >>> DataSet as you iterate over it. >>> >>> AFAIK this is currently not possible in Flink, and indeed it's a real >>> bottleneck for ML algorithms. This is the reason our current >>> SGD implementation does a pass over the whole dataset at each iteration, >>> since we cannot take a sample from the dataset >>> and iterate only over that (so it's not really stochastic). >>> >>> The relevant JIRA is here: >>> https://issues.apache.org/jira/browse/FLINK-2396 >>> >>> I would love to start a discussion on how we can proceed to fix this. >>> >>> Regards, >>> Theodore >>> >>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlie...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> probably more of a question for Till: >>>> >>>> Imagine a common ML algorithm flow that runs until convergence. >>>> >>>> typical distributed flow would be something like that (e.g. GMM EM >> would >>> be >>>> exactly like that): >>>> >>>> A: input >>>> >>>> do { >>>> >>>> stat1 = A.map.reduce >>>> A = A.update-map(stat1) >>>> conv = A.map.reduce >>>> } until conv > convThreshold >>>> >>>> There probably could be 1 map-reduce step originating on A to compute >>> both >>>> convergence criteria statistics and udpate statistics in one step. not >>> the >>>> point. >>>> >>>> The point is that update and map.reduce originate on the same dataset >>>> intermittently. >>>> >>>> In spark we would normally commit A to a object tree cache so that data >>> is >>>> available to subsequent map passes without any I/O or serialization >>>> operations, thus insuring high rate of iterations. >>>> >>>> We observe the same pattern pretty much everywhere. clustering, >>>> probabilistic algorithms, even batch gradient descent of quasi newton >>>> algorithms fitting. >>>> >>>> How do we do something like that, for example, in FlinkML? >>>> >>>> Thoughts? >>>> >>>> thanks. >>>> >>>> -Dmitriy >>>> >>> >>