@Shannon What you are talking about is available for the DataSet API through the iterateWithTermination function. See the API docs <https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#iteration-operators> and Iterations page <https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html> .
On Tue, Mar 29, 2016 at 3:14 PM, Shannon Quinn <squ...@gatech.edu> wrote: > Apologies for hijacking, but this thread hits right at my last message to > this list (looking to implement native iterations in the PyFlink API). > > I'm particularly interested in custom convergence criteria, often centered > around measuring some sort of squared loss and checking if it falls below a > threshold. Is this what you mean by a "dynamic convergence criterion"? > Certainly having a max-iterations cut-off as a "just in case" measure is a > good thing, but I'm curious if there's a native way of using a > threshold-based criterion that doesn't involve simply iterating 10 or so > times, checking the criterion, and iterating some more. > > Shannon > > > On 3/29/16 5:53 AM, Till Rohrmann wrote: > >> Hi, >> >> Chiwan’s example is perfectly fine and it should also work with general EM >> algorithms. Moreover, it is the recommended way how to implement >> iterations >> with Flink. The iterateWithTermination API call generates a lazily >> evaluated data flow with an iteration operator. This plan will only be >> executed when you call env.execute, collect or count which depends on this >> data flow. In the example it would be triggered by result.print. You can >> also take a look at the KMeans implementation of Flink. It does not use a >> dynamic convergence criterion but it could easily be added. >> >> If you really need to trigger the execution of the data flow for each >> iteration (e.g. because you have different data flows depending on the >> result), then you should persist the intermediate result every n >> iteration. >> Otherwise you will over and over re-trigger the execution of previous >> operators. >> >> Cheers, >> Till >> >> >> On Tue, Mar 29, 2016 at 1:26 AM, Dmitriy Lyubimov <dlie...@gmail.com> >> wrote: >> >> Thanks Chiwan. >>> >>> I think this example still creates a lazy-evaluated plan. And if i need >>> to >>> collect statistics to front end (and use it in subsequent iteration >>> evaluation) as my example with computing column-wise averages suggests? >>> >>> problem generally is, what if I need to eagerly evaluate the statistics >>> inside the iteration in order to proceed with further computations (and >>> even plan construction). typically, that would be result of M-step in EM >>> algorithm. >>> >>> On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <chiwanp...@apache.org> >>> wrote: >>> >>> Hi Dmitriy, >>>> >>>> I think you can implement it with iterative API with custom convergence >>>> criterion. You can express the convergence criterion by two methods. One >>>> >>> is >>> >>>> using a convergence criterion data set [1][2] and the other is >>>> >>> registering >>> >>>> an aggregator with custom implementation of `ConvergenceCriterion` >>>> interface [3]. >>>> >>>> Here is an example using a convergence criterion data set in Scala API: >>>> >>>> ``` >>>> package flink.sample >>>> >>>> import org.apache.flink.api.scala._ >>>> >>>> import scala.util.Random >>>> >>>> object SampleApp extends App { >>>> val env = ExecutionEnvironment.getExecutionEnvironment >>>> >>>> val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10) >>>> >>>> val result = data.iterateWithTermination(5000) { prev => >>>> // calculate sub solution >>>> val rand = Random.nextDouble() >>>> val subSolution = prev.map(_ * rand) >>>> >>>> // calculate convergent condition >>>> val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ > >>>> 8) >>>> >>>> (subSolution, convergence) >>>> } >>>> >>>> result.print() >>>> } >>>> ``` >>>> >>>> Regards, >>>> Chiwan Park >>>> >>>> [1]: >>>> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29 >>> >>>> [2]: iterateWithTermination method in >>>> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet >>> >>>> [3]: >>>> >>>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29 >>> >>>> On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dlie...@gmail.com> >>>>> >>>> wrote: >>> >>>> Thank you, all :) >>>>> >>>>> yes, that's my question. How do we construct such a loop with a >>>>> >>>> concrete >>> >>>> example? >>>>> >>>>> Let's take something nonsensical yet specific. >>>>> >>>>> Say, in samsara terms we do something like that : >>>>> >>>>> var avg = Double.PositiveInfinity >>>>> var drmA = ... (construct elsewhere) >>>>> >>>>> >>>>> >>>>> do { >>>>> avg = drmA.colMeans.mean // average of col-wise means >>>>> drmA = drmA - avg // elementwise subtract of average >>>>> >>>>> } while (avg > 1e-10) >>>>> >>>>> (which probably does not converge in reality). >>>>> >>>>> How would we implement that with native iterations in flink? >>>>> >>>>> >>>>> >>>>> On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <trohrm...@apache.org> >>>>> >>>> wrote: >>>> >>>>> Hi Dmitriy, >>>>>> >>>>>> I’m not sure whether I’ve understood your question correctly, so >>>>>> >>>>> please >>> >>>> correct me if I’m wrong. >>>>>> >>>>>> So you’re asking whether it is a problem that >>>>>> >>>>>> stat1 = A.map.reduce >>>>>> A = A.update.map(stat1) >>>>>> >>>>>> are executed on the same input data set A and whether we have to >>>>>> >>>>> cache A >>> >>>> for that, right? I assume you’re worried that A is calculated twice. >>>>>> >>>>>> Since you don’t have a API call which triggers eager execution of the >>>>>> >>>>> data >>>> >>>>> flow, the map.reduce and map(stat1) call will only construct the data >>>>>> >>>>> flow >>>> >>>>> of your program. Both operators will depend on the result of A which >>>>>> >>>>> is >>> >>>> only once calculated (when execute, collect or count is called) and >>>>>> >>>>> then >>> >>>> sent to the map.reduce and map(stat1) operator. >>>>>> >>>>>> However, it is not recommended using an explicit loop to do iterative >>>>>> computations with Flink. The problem here is that you will basically >>>>>> >>>>> unroll >>>> >>>>> the loop and construct a long pipeline with the operations of each >>>>>> iterations. Once you execute this long pipeline you will face >>>>>> >>>>> considerable >>>> >>>>> memory fragmentation, because every operator will get a proportional >>>>>> fraction of the available memory assigned. Even worse, if you trigger >>>>>> >>>>> the >>>> >>>>> execution of your data flow to evaluate the convergence criterion, you >>>>>> >>>>> will >>>> >>>>> execute for each iteration the complete pipeline which has been built >>>>>> >>>>> up so >>>> >>>>> far. Thus, you’ll end up with a quadratic complexity in the number of >>>>>> iterations. Therefore, I would highly recommend using Flink’s built in >>>>>> support for native iterations which won’t suffer from this problem or >>>>>> >>>>> to >>> >>>> materialize at least for every n iterations the intermediate result. >>>>>> >>>>> At >>> >>>> the >>>> >>>>> moment this would mean to write the data to some sink and then reading >>>>>> >>>>> it >>>> >>>>> from there again. >>>>>> >>>>>> I hope this answers your question. If not, then don’t hesitate to ask >>>>>> >>>>> me >>> >>>> again. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> >>>>>> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis < >>>>>> theodoros.vasilou...@gmail.com> wrote: >>>>>> >>>>>> Hello Dmitriy, >>>>>>> >>>>>>> If I understood correctly what you are basically talking about >>>>>>> >>>>>> modifying >>>> >>>>> a >>>>>> >>>>>>> DataSet as you iterate over it. >>>>>>> >>>>>>> AFAIK this is currently not possible in Flink, and indeed it's a real >>>>>>> bottleneck for ML algorithms. This is the reason our current >>>>>>> SGD implementation does a pass over the whole dataset at each >>>>>>> >>>>>> iteration, >>>> >>>>> since we cannot take a sample from the dataset >>>>>>> and iterate only over that (so it's not really stochastic). >>>>>>> >>>>>>> The relevant JIRA is here: >>>>>>> https://issues.apache.org/jira/browse/FLINK-2396 >>>>>>> >>>>>>> I would love to start a discussion on how we can proceed to fix this. >>>>>>> >>>>>>> Regards, >>>>>>> Theodore >>>>>>> >>>>>>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlie...@gmail.com >>>>>>> wrote: >>>>>>> >>>>>>> Hi, >>>>>>>> >>>>>>>> probably more of a question for Till: >>>>>>>> >>>>>>>> Imagine a common ML algorithm flow that runs until convergence. >>>>>>>> >>>>>>>> typical distributed flow would be something like that (e.g. GMM EM >>>>>>>> >>>>>>> would >>>>>> >>>>>>> be >>>>>>> >>>>>>>> exactly like that): >>>>>>>> >>>>>>>> A: input >>>>>>>> >>>>>>>> do { >>>>>>>> >>>>>>>> stat1 = A.map.reduce >>>>>>>> A = A.update-map(stat1) >>>>>>>> conv = A.map.reduce >>>>>>>> } until conv > convThreshold >>>>>>>> >>>>>>>> There probably could be 1 map-reduce step originating on A to >>>>>>>> >>>>>>> compute >>> >>>> both >>>>>>> >>>>>>>> convergence criteria statistics and udpate statistics in one step. >>>>>>>> >>>>>>> not >>> >>>> the >>>>>>> >>>>>>>> point. >>>>>>>> >>>>>>>> The point is that update and map.reduce originate on the same >>>>>>>> >>>>>>> dataset >>> >>>> intermittently. >>>>>>>> >>>>>>>> In spark we would normally commit A to a object tree cache so that >>>>>>>> >>>>>>> data >>>> >>>>> is >>>>>>> >>>>>>>> available to subsequent map passes without any I/O or serialization >>>>>>>> operations, thus insuring high rate of iterations. >>>>>>>> >>>>>>>> We observe the same pattern pretty much everywhere. clustering, >>>>>>>> probabilistic algorithms, even batch gradient descent of quasi >>>>>>>> >>>>>>> newton >>> >>>> algorithms fitting. >>>>>>>> >>>>>>>> How do we do something like that, for example, in FlinkML? >>>>>>>> >>>>>>>> Thoughts? >>>>>>>> >>>>>>>> thanks. >>>>>>>> >>>>>>>> -Dmitriy >>>>>>>> >>>>>>>> >>>> >