@Shannon

What you are talking about is available for the DataSet API through the
iterateWithTermination function. See the API docs
<https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#iteration-operators>
and Iterations page
<https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html>
.

On Tue, Mar 29, 2016 at 3:14 PM, Shannon Quinn <squ...@gatech.edu> wrote:

> Apologies for hijacking, but this thread hits right at my last message to
> this list (looking to implement native iterations in the PyFlink API).
>
> I'm particularly interested in custom convergence criteria, often centered
> around measuring some sort of squared loss and checking if it falls below a
> threshold. Is this what you mean by a "dynamic convergence criterion"?
> Certainly having a max-iterations cut-off as a "just in case" measure is a
> good thing, but I'm curious if there's a native way of using a
> threshold-based criterion that doesn't involve simply iterating 10 or so
> times, checking the criterion, and iterating some more.
>
> Shannon
>
>
> On 3/29/16 5:53 AM, Till Rohrmann wrote:
>
>> Hi,
>>
>> Chiwan’s example is perfectly fine and it should also work with general EM
>> algorithms. Moreover, it is the recommended way how to implement
>> iterations
>> with Flink. The iterateWithTermination API call generates a lazily
>> evaluated data flow with an iteration operator. This plan will only be
>> executed when you call env.execute, collect or count which depends on this
>> data flow. In the example it would be triggered by result.print. You can
>> also take a look at the KMeans implementation of Flink. It does not use a
>> dynamic convergence criterion but it could easily be added.
>>
>> If you really need to trigger the execution of the data flow for each
>> iteration (e.g. because you have different data flows depending on the
>> result), then you should persist the intermediate result every n
>> iteration.
>> Otherwise you will over and over re-trigger the execution of previous
>> operators.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Mar 29, 2016 at 1:26 AM, Dmitriy Lyubimov <dlie...@gmail.com>
>> wrote:
>>
>> Thanks Chiwan.
>>>
>>> I think this example still creates a lazy-evaluated plan. And if i need
>>> to
>>> collect statistics to front end (and use it in subsequent iteration
>>> evaluation) as my example with computing column-wise averages suggests?
>>>
>>> problem generally is, what if I need to eagerly evaluate the statistics
>>> inside the iteration in order to proceed with further computations (and
>>> even plan construction). typically, that would be result of M-step in EM
>>> algorithm.
>>>
>>> On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <chiwanp...@apache.org>
>>> wrote:
>>>
>>> Hi Dmitriy,
>>>>
>>>> I think you can implement it with iterative API with custom convergence
>>>> criterion. You can express the convergence criterion by two methods. One
>>>>
>>> is
>>>
>>>> using a convergence criterion data set [1][2] and the other is
>>>>
>>> registering
>>>
>>>> an aggregator with custom implementation of `ConvergenceCriterion`
>>>> interface [3].
>>>>
>>>> Here is an example using a convergence criterion data set in Scala API:
>>>>
>>>> ```
>>>> package flink.sample
>>>>
>>>> import org.apache.flink.api.scala._
>>>>
>>>> import scala.util.Random
>>>>
>>>> object SampleApp extends App {
>>>>    val env = ExecutionEnvironment.getExecutionEnvironment
>>>>
>>>>    val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>>>
>>>>    val result = data.iterateWithTermination(5000) { prev =>
>>>>      // calculate sub solution
>>>>      val rand = Random.nextDouble()
>>>>      val subSolution = prev.map(_ * rand)
>>>>
>>>>      // calculate convergent condition
>>>>      val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ >
>>>> 8)
>>>>
>>>>      (subSolution, convergence)
>>>>    }
>>>>
>>>>    result.print()
>>>> }
>>>> ```
>>>>
>>>> Regards,
>>>> Chiwan Park
>>>>
>>>> [1]:
>>>>
>>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
>>>
>>>> [2]: iterateWithTermination method in
>>>>
>>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
>>>
>>>> [3]:
>>>>
>>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29
>>>
>>>> On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dlie...@gmail.com>
>>>>>
>>>> wrote:
>>>
>>>> Thank you, all :)
>>>>>
>>>>> yes, that's my question. How do we construct such a loop with a
>>>>>
>>>> concrete
>>>
>>>> example?
>>>>>
>>>>> Let's take something nonsensical yet specific.
>>>>>
>>>>> Say, in samsara terms we do something like that :
>>>>>
>>>>> var avg = Double.PositiveInfinity
>>>>> var drmA = ... (construct elsewhere)
>>>>>
>>>>>
>>>>>
>>>>> do {
>>>>>    avg = drmA.colMeans.mean // average of col-wise means
>>>>>    drmA = drmA - avg // elementwise subtract of average
>>>>>
>>>>> } while (avg > 1e-10)
>>>>>
>>>>> (which probably does not converge in reality).
>>>>>
>>>>> How would we implement that with native iterations in flink?
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <trohrm...@apache.org>
>>>>>
>>>> wrote:
>>>>
>>>>> Hi Dmitriy,
>>>>>>
>>>>>> I’m not sure whether I’ve understood your question correctly, so
>>>>>>
>>>>> please
>>>
>>>> correct me if I’m wrong.
>>>>>>
>>>>>> So you’re asking whether it is a problem that
>>>>>>
>>>>>> stat1 = A.map.reduce
>>>>>> A = A.update.map(stat1)
>>>>>>
>>>>>> are executed on the same input data set A and whether we have to
>>>>>>
>>>>> cache A
>>>
>>>> for that, right? I assume you’re worried that A is calculated twice.
>>>>>>
>>>>>> Since you don’t have a API call which triggers eager execution of the
>>>>>>
>>>>> data
>>>>
>>>>> flow, the map.reduce and map(stat1) call will only construct the data
>>>>>>
>>>>> flow
>>>>
>>>>> of your program. Both operators will depend on the result of A which
>>>>>>
>>>>> is
>>>
>>>> only once calculated (when execute, collect or count is called) and
>>>>>>
>>>>> then
>>>
>>>> sent to the map.reduce and map(stat1) operator.
>>>>>>
>>>>>> However, it is not recommended using an explicit loop to do iterative
>>>>>> computations with Flink. The problem here is that you will basically
>>>>>>
>>>>> unroll
>>>>
>>>>> the loop and construct a long pipeline with the operations of each
>>>>>> iterations. Once you execute this long pipeline you will face
>>>>>>
>>>>> considerable
>>>>
>>>>> memory fragmentation, because every operator will get a proportional
>>>>>> fraction of the available memory assigned. Even worse, if you trigger
>>>>>>
>>>>> the
>>>>
>>>>> execution of your data flow to evaluate the convergence criterion, you
>>>>>>
>>>>> will
>>>>
>>>>> execute for each iteration the complete pipeline which has been built
>>>>>>
>>>>> up so
>>>>
>>>>> far. Thus, you’ll end up with a quadratic complexity in the number of
>>>>>> iterations. Therefore, I would highly recommend using Flink’s built in
>>>>>> support for native iterations which won’t suffer from this problem or
>>>>>>
>>>>> to
>>>
>>>> materialize at least for every n iterations the intermediate result.
>>>>>>
>>>>> At
>>>
>>>> the
>>>>
>>>>> moment this would mean to write the data to some sink and then reading
>>>>>>
>>>>> it
>>>>
>>>>> from there again.
>>>>>>
>>>>>> I hope this answers your question. If not, then don’t hesitate to ask
>>>>>>
>>>>> me
>>>
>>>> again.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>> ​
>>>>>>
>>>>>> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
>>>>>> theodoros.vasilou...@gmail.com> wrote:
>>>>>>
>>>>>> Hello Dmitriy,
>>>>>>>
>>>>>>> If I understood correctly what you are basically talking about
>>>>>>>
>>>>>> modifying
>>>>
>>>>> a
>>>>>>
>>>>>>> DataSet as you iterate over it.
>>>>>>>
>>>>>>> AFAIK this is currently not possible in Flink, and indeed it's a real
>>>>>>> bottleneck for ML algorithms. This is the reason our current
>>>>>>> SGD implementation does a pass over the whole dataset at each
>>>>>>>
>>>>>> iteration,
>>>>
>>>>> since we cannot take a sample from the dataset
>>>>>>> and iterate only over that (so it's not really stochastic).
>>>>>>>
>>>>>>> The relevant JIRA is here:
>>>>>>> https://issues.apache.org/jira/browse/FLINK-2396
>>>>>>>
>>>>>>> I would love to start a discussion on how we can proceed to fix this.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Theodore
>>>>>>>
>>>>>>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlie...@gmail.com
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>>
>>>>>>>> probably more of a question for Till:
>>>>>>>>
>>>>>>>> Imagine a common ML algorithm flow that runs until convergence.
>>>>>>>>
>>>>>>>> typical distributed flow would be something like that (e.g. GMM EM
>>>>>>>>
>>>>>>> would
>>>>>>
>>>>>>> be
>>>>>>>
>>>>>>>> exactly like that):
>>>>>>>>
>>>>>>>> A: input
>>>>>>>>
>>>>>>>> do {
>>>>>>>>
>>>>>>>>    stat1 = A.map.reduce
>>>>>>>>    A = A.update-map(stat1)
>>>>>>>>    conv = A.map.reduce
>>>>>>>> } until conv > convThreshold
>>>>>>>>
>>>>>>>> There probably could be 1 map-reduce step originating on A to
>>>>>>>>
>>>>>>> compute
>>>
>>>> both
>>>>>>>
>>>>>>>> convergence criteria statistics and udpate statistics in one step.
>>>>>>>>
>>>>>>> not
>>>
>>>> the
>>>>>>>
>>>>>>>> point.
>>>>>>>>
>>>>>>>> The point is that update and map.reduce originate on the same
>>>>>>>>
>>>>>>> dataset
>>>
>>>> intermittently.
>>>>>>>>
>>>>>>>> In spark we would normally commit A to a object tree cache so that
>>>>>>>>
>>>>>>> data
>>>>
>>>>> is
>>>>>>>
>>>>>>>> available to subsequent map passes without any I/O or serialization
>>>>>>>> operations, thus insuring high rate of iterations.
>>>>>>>>
>>>>>>>> We observe the same pattern pretty much everywhere. clustering,
>>>>>>>> probabilistic algorithms, even batch gradient descent of quasi
>>>>>>>>
>>>>>>> newton
>>>
>>>> algorithms fitting.
>>>>>>>>
>>>>>>>> How do we do something like that, for example, in FlinkML?
>>>>>>>>
>>>>>>>> Thoughts?
>>>>>>>>
>>>>>>>> thanks.
>>>>>>>>
>>>>>>>> -Dmitriy
>>>>>>>>
>>>>>>>>
>>>>
>

Reply via email to