Thank you, all :)

yes, that's my question. How do we construct such a loop with a concrete
example?

Let's take something nonsensical yet specific.

Say, in samsara terms we do something like that :

var avg = Double.PositiveInfinity
var drmA = ... (construct elsewhere)



do {
   avg = drmA.colMeans.mean // average of col-wise means
   drmA = drmA - avg // elementwise subtract of average

} while (avg > 1e-10)

(which probably does not converge in reality).

How would we implement that with native iterations in flink?



On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Dmitriy,
>
> I’m not sure whether I’ve understood your question correctly, so please
> correct me if I’m wrong.
>
> So you’re asking whether it is a problem that
>
> stat1 = A.map.reduce
> A = A.update.map(stat1)
>
> are executed on the same input data set A and whether we have to cache A
> for that, right? I assume you’re worried that A is calculated twice.
>
> Since you don’t have a API call which triggers eager execution of the data
> flow, the map.reduce and map(stat1) call will only construct the data flow
> of your program. Both operators will depend on the result of A which is
> only once calculated (when execute, collect or count is called) and then
> sent to the map.reduce and map(stat1) operator.
>
> However, it is not recommended using an explicit loop to do iterative
> computations with Flink. The problem here is that you will basically unroll
> the loop and construct a long pipeline with the operations of each
> iterations. Once you execute this long pipeline you will face considerable
> memory fragmentation, because every operator will get a proportional
> fraction of the available memory assigned. Even worse, if you trigger the
> execution of your data flow to evaluate the convergence criterion, you will
> execute for each iteration the complete pipeline which has been built up so
> far. Thus, you’ll end up with a quadratic complexity in the number of
> iterations. Therefore, I would highly recommend using Flink’s built in
> support for native iterations which won’t suffer from this problem or to
> materialize at least for every n iterations the intermediate result. At the
> moment this would mean to write the data to some sink and then reading it
> from there again.
>
> I hope this answers your question. If not, then don’t hesitate to ask me
> again.
>
> Cheers,
> Till
> ​
>
> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com> wrote:
>
> > Hello Dmitriy,
> >
> > If I understood correctly what you are basically talking about modifying
> a
> > DataSet as you iterate over it.
> >
> > AFAIK this is currently not possible in Flink, and indeed it's a real
> > bottleneck for ML algorithms. This is the reason our current
> > SGD implementation does a pass over the whole dataset at each iteration,
> > since we cannot take a sample from the dataset
> > and iterate only over that (so it's not really stochastic).
> >
> > The relevant JIRA is here:
> > https://issues.apache.org/jira/browse/FLINK-2396
> >
> > I would love to start a discussion on how we can proceed to fix this.
> >
> > Regards,
> > Theodore
> >
> > On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlie...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > probably more of a question for Till:
> > >
> > > Imagine a common ML algorithm flow that runs until convergence.
> > >
> > > typical distributed flow would be something like that (e.g. GMM EM
> would
> > be
> > > exactly like that):
> > >
> > > A: input
> > >
> > > do {
> > >
> > >    stat1 = A.map.reduce
> > >    A = A.update-map(stat1)
> > >    conv = A.map.reduce
> > > } until conv > convThreshold
> > >
> > > There probably could be 1 map-reduce step originating on A to compute
> > both
> > > convergence criteria statistics and udpate statistics in one step. not
> > the
> > > point.
> > >
> > > The point is that update and map.reduce originate on the same dataset
> > > intermittently.
> > >
> > > In spark we would normally commit A to a object tree cache so that data
> > is
> > > available to subsequent map passes without any I/O or serialization
> > > operations, thus insuring high rate of iterations.
> > >
> > > We observe the same pattern pretty much everywhere. clustering,
> > > probabilistic algorithms, even batch gradient descent of quasi newton
> > > algorithms fitting.
> > >
> > > How do we do something like that, for example, in FlinkML?
> > >
> > > Thoughts?
> > >
> > > thanks.
> > >
> > > -Dmitriy
> > >
> >
>

Reply via email to