Thanks Chiwan.
I think this example still creates a lazy-evaluated plan. And if i need to
collect statistics to front end (and use it in subsequent iteration
evaluation) as my example with computing column-wise averages suggests?
problem generally is, what if I need to eagerly evaluate the statistics
inside the iteration in order to proceed with further computations (and
even plan construction). typically, that would be result of M-step in EM
algorithm.
On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <chiwanp...@apache.org>
wrote:
Hi Dmitriy,
I think you can implement it with iterative API with custom convergence
criterion. You can express the convergence criterion by two methods. One
is
using a convergence criterion data set [1][2] and the other is
registering
an aggregator with custom implementation of `ConvergenceCriterion`
interface [3].
Here is an example using a convergence criterion data set in Scala API:
```
package flink.sample
import org.apache.flink.api.scala._
import scala.util.Random
object SampleApp extends App {
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val result = data.iterateWithTermination(5000) { prev =>
// calculate sub solution
val rand = Random.nextDouble()
val subSolution = prev.map(_ * rand)
// calculate convergent condition
val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ > 8)
(subSolution, convergence)
}
result.print()
}
```
Regards,
Chiwan Park
[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
[2]: iterateWithTermination method in
https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
[3]:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29
On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dlie...@gmail.com>
wrote:
Thank you, all :)
yes, that's my question. How do we construct such a loop with a
concrete
example?
Let's take something nonsensical yet specific.
Say, in samsara terms we do something like that :
var avg = Double.PositiveInfinity
var drmA = ... (construct elsewhere)
do {
avg = drmA.colMeans.mean // average of col-wise means
drmA = drmA - avg // elementwise subtract of average
} while (avg > 1e-10)
(which probably does not converge in reality).
How would we implement that with native iterations in flink?
On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <trohrm...@apache.org>
wrote:
Hi Dmitriy,
I’m not sure whether I’ve understood your question correctly, so
please
correct me if I’m wrong.
So you’re asking whether it is a problem that
stat1 = A.map.reduce
A = A.update.map(stat1)
are executed on the same input data set A and whether we have to
cache A
for that, right? I assume you’re worried that A is calculated twice.
Since you don’t have a API call which triggers eager execution of the
data
flow, the map.reduce and map(stat1) call will only construct the data
flow
of your program. Both operators will depend on the result of A which
is
only once calculated (when execute, collect or count is called) and
then
sent to the map.reduce and map(stat1) operator.
However, it is not recommended using an explicit loop to do iterative
computations with Flink. The problem here is that you will basically
unroll
the loop and construct a long pipeline with the operations of each
iterations. Once you execute this long pipeline you will face
considerable
memory fragmentation, because every operator will get a proportional
fraction of the available memory assigned. Even worse, if you trigger
the
execution of your data flow to evaluate the convergence criterion, you
will
execute for each iteration the complete pipeline which has been built
up so
far. Thus, you’ll end up with a quadratic complexity in the number of
iterations. Therefore, I would highly recommend using Flink’s built in
support for native iterations which won’t suffer from this problem or
to
materialize at least for every n iterations the intermediate result.
At
the
moment this would mean to write the data to some sink and then reading
it
from there again.
I hope this answers your question. If not, then don’t hesitate to ask
me
again.
Cheers,
Till
On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:
Hello Dmitriy,
If I understood correctly what you are basically talking about
modifying
a
DataSet as you iterate over it.
AFAIK this is currently not possible in Flink, and indeed it's a real
bottleneck for ML algorithms. This is the reason our current
SGD implementation does a pass over the whole dataset at each
iteration,
since we cannot take a sample from the dataset
and iterate only over that (so it's not really stochastic).
The relevant JIRA is here:
https://issues.apache.org/jira/browse/FLINK-2396
I would love to start a discussion on how we can proceed to fix this.
Regards,
Theodore
On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlie...@gmail.com
wrote:
Hi,
probably more of a question for Till:
Imagine a common ML algorithm flow that runs until convergence.
typical distributed flow would be something like that (e.g. GMM EM
would
be
exactly like that):
A: input
do {
stat1 = A.map.reduce
A = A.update-map(stat1)
conv = A.map.reduce
} until conv > convThreshold
There probably could be 1 map-reduce step originating on A to
compute
both
convergence criteria statistics and udpate statistics in one step.
not
the
point.
The point is that update and map.reduce originate on the same
dataset
intermittently.
In spark we would normally commit A to a object tree cache so that
data
is
available to subsequent map passes without any I/O or serialization
operations, thus insuring high rate of iterations.
We observe the same pattern pretty much everywhere. clustering,
probabilistic algorithms, even batch gradient descent of quasi
newton
algorithms fitting.
How do we do something like that, for example, in FlinkML?
Thoughts?
thanks.
-Dmitriy