Thank you, all :) yes, that's my question. How do we construct such a loop with a concrete example?
Let's take something nonsensical yet specific. Say, in samsara terms we do something like that : var avg = Double.PositiveInfinity var drmA = ... (construct elsewhere) do { avg = drmA.colMeans.mean // average of col-wise means drmA = drmA - avg // elementwise subtract of average } while (avg > 1e-10) (which probably does not converge in reality). How would we implement that with native iterations in flink? On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Dmitriy, > > I’m not sure whether I’ve understood your question correctly, so please > correct me if I’m wrong. > > So you’re asking whether it is a problem that > > stat1 = A.map.reduce > A = A.update.map(stat1) > > are executed on the same input data set A and whether we have to cache A > for that, right? I assume you’re worried that A is calculated twice. > > Since you don’t have a API call which triggers eager execution of the data > flow, the map.reduce and map(stat1) call will only construct the data flow > of your program. Both operators will depend on the result of A which is > only once calculated (when execute, collect or count is called) and then > sent to the map.reduce and map(stat1) operator. > > However, it is not recommended using an explicit loop to do iterative > computations with Flink. The problem here is that you will basically unroll > the loop and construct a long pipeline with the operations of each > iterations. Once you execute this long pipeline you will face considerable > memory fragmentation, because every operator will get a proportional > fraction of the available memory assigned. Even worse, if you trigger the > execution of your data flow to evaluate the convergence criterion, you will > execute for each iteration the complete pipeline which has been built up so > far. Thus, you’ll end up with a quadratic complexity in the number of > iterations. Therefore, I would highly recommend using Flink’s built in > support for native iterations which won’t suffer from this problem or to > materialize at least for every n iterations the intermediate result. At the > moment this would mean to write the data to some sink and then reading it > from there again. > > I hope this answers your question. If not, then don’t hesitate to ask me > again. > > Cheers, > Till > > > On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis < > theodoros.vasilou...@gmail.com> wrote: > > > Hello Dmitriy, > > > > If I understood correctly what you are basically talking about modifying > a > > DataSet as you iterate over it. > > > > AFAIK this is currently not possible in Flink, and indeed it's a real > > bottleneck for ML algorithms. This is the reason our current > > SGD implementation does a pass over the whole dataset at each iteration, > > since we cannot take a sample from the dataset > > and iterate only over that (so it's not really stochastic). > > > > The relevant JIRA is here: > > https://issues.apache.org/jira/browse/FLINK-2396 > > > > I would love to start a discussion on how we can proceed to fix this. > > > > Regards, > > Theodore > > > > On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlie...@gmail.com> > > wrote: > > > > > Hi, > > > > > > probably more of a question for Till: > > > > > > Imagine a common ML algorithm flow that runs until convergence. > > > > > > typical distributed flow would be something like that (e.g. GMM EM > would > > be > > > exactly like that): > > > > > > A: input > > > > > > do { > > > > > > stat1 = A.map.reduce > > > A = A.update-map(stat1) > > > conv = A.map.reduce > > > } until conv > convThreshold > > > > > > There probably could be 1 map-reduce step originating on A to compute > > both > > > convergence criteria statistics and udpate statistics in one step. not > > the > > > point. > > > > > > The point is that update and map.reduce originate on the same dataset > > > intermittently. > > > > > > In spark we would normally commit A to a object tree cache so that data > > is > > > available to subsequent map passes without any I/O or serialization > > > operations, thus insuring high rate of iterations. > > > > > > We observe the same pattern pretty much everywhere. clustering, > > > probabilistic algorithms, even batch gradient descent of quasi newton > > > algorithms fitting. > > > > > > How do we do something like that, for example, in FlinkML? > > > > > > Thoughts? > > > > > > thanks. > > > > > > -Dmitriy > > > > > >