Hi Dmitriy,

I think you can implement it with iterative API with custom convergence 
criterion. You can express the convergence criterion by two methods. One is 
using a convergence criterion data set [1][2] and the other is registering an 
aggregator with custom implementation of `ConvergenceCriterion` interface [3].

Here is an example using a convergence criterion data set in Scala API:

```
package flink.sample

import org.apache.flink.api.scala._

import scala.util.Random

object SampleApp extends App {
  val env = ExecutionEnvironment.getExecutionEnvironment

  val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

  val result = data.iterateWithTermination(5000) { prev =>
    // calculate sub solution
    val rand = Random.nextDouble()
    val subSolution = prev.map(_ * rand)

    // calculate convergent condition
    val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ > 8)

    (subSolution, convergence)
  }

  result.print()
}
```

Regards,
Chiwan Park

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
[2]: iterateWithTermination method in 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
[3]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29

> On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dlie...@gmail.com> wrote:
> 
> Thank you, all :)
> 
> yes, that's my question. How do we construct such a loop with a concrete
> example?
> 
> Let's take something nonsensical yet specific.
> 
> Say, in samsara terms we do something like that :
> 
> var avg = Double.PositiveInfinity
> var drmA = ... (construct elsewhere)
> 
> 
> 
> do {
>   avg = drmA.colMeans.mean // average of col-wise means
>   drmA = drmA - avg // elementwise subtract of average
> 
> } while (avg > 1e-10)
> 
> (which probably does not converge in reality).
> 
> How would we implement that with native iterations in flink?
> 
> 
> 
> On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <trohrm...@apache.org> wrote:
> 
>> Hi Dmitriy,
>> 
>> I’m not sure whether I’ve understood your question correctly, so please
>> correct me if I’m wrong.
>> 
>> So you’re asking whether it is a problem that
>> 
>> stat1 = A.map.reduce
>> A = A.update.map(stat1)
>> 
>> are executed on the same input data set A and whether we have to cache A
>> for that, right? I assume you’re worried that A is calculated twice.
>> 
>> Since you don’t have a API call which triggers eager execution of the data
>> flow, the map.reduce and map(stat1) call will only construct the data flow
>> of your program. Both operators will depend on the result of A which is
>> only once calculated (when execute, collect or count is called) and then
>> sent to the map.reduce and map(stat1) operator.
>> 
>> However, it is not recommended using an explicit loop to do iterative
>> computations with Flink. The problem here is that you will basically unroll
>> the loop and construct a long pipeline with the operations of each
>> iterations. Once you execute this long pipeline you will face considerable
>> memory fragmentation, because every operator will get a proportional
>> fraction of the available memory assigned. Even worse, if you trigger the
>> execution of your data flow to evaluate the convergence criterion, you will
>> execute for each iteration the complete pipeline which has been built up so
>> far. Thus, you’ll end up with a quadratic complexity in the number of
>> iterations. Therefore, I would highly recommend using Flink’s built in
>> support for native iterations which won’t suffer from this problem or to
>> materialize at least for every n iterations the intermediate result. At the
>> moment this would mean to write the data to some sink and then reading it
>> from there again.
>> 
>> I hope this answers your question. If not, then don’t hesitate to ask me
>> again.
>> 
>> Cheers,
>> Till
>> ​
>> 
>> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
>> theodoros.vasilou...@gmail.com> wrote:
>> 
>>> Hello Dmitriy,
>>> 
>>> If I understood correctly what you are basically talking about modifying
>> a
>>> DataSet as you iterate over it.
>>> 
>>> AFAIK this is currently not possible in Flink, and indeed it's a real
>>> bottleneck for ML algorithms. This is the reason our current
>>> SGD implementation does a pass over the whole dataset at each iteration,
>>> since we cannot take a sample from the dataset
>>> and iterate only over that (so it's not really stochastic).
>>> 
>>> The relevant JIRA is here:
>>> https://issues.apache.org/jira/browse/FLINK-2396
>>> 
>>> I would love to start a discussion on how we can proceed to fix this.
>>> 
>>> Regards,
>>> Theodore
>>> 
>>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlie...@gmail.com>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> probably more of a question for Till:
>>>> 
>>>> Imagine a common ML algorithm flow that runs until convergence.
>>>> 
>>>> typical distributed flow would be something like that (e.g. GMM EM
>> would
>>> be
>>>> exactly like that):
>>>> 
>>>> A: input
>>>> 
>>>> do {
>>>> 
>>>>   stat1 = A.map.reduce
>>>>   A = A.update-map(stat1)
>>>>   conv = A.map.reduce
>>>> } until conv > convThreshold
>>>> 
>>>> There probably could be 1 map-reduce step originating on A to compute
>>> both
>>>> convergence criteria statistics and udpate statistics in one step. not
>>> the
>>>> point.
>>>> 
>>>> The point is that update and map.reduce originate on the same dataset
>>>> intermittently.
>>>> 
>>>> In spark we would normally commit A to a object tree cache so that data
>>> is
>>>> available to subsequent map passes without any I/O or serialization
>>>> operations, thus insuring high rate of iterations.
>>>> 
>>>> We observe the same pattern pretty much everywhere. clustering,
>>>> probabilistic algorithms, even batch gradient descent of quasi newton
>>>> algorithms fitting.
>>>> 
>>>> How do we do something like that, for example, in FlinkML?
>>>> 
>>>> Thoughts?
>>>> 
>>>> thanks.
>>>> 
>>>> -Dmitriy
>>>> 
>>> 
>> 

Reply via email to