On Thu, Jan 14, 2016 at 5:52 AM, Ufuk Celebi <u...@apache.org> wrote:

> Hey Kovas
>
> sorry for the long delay.
>

It was worth the wait! Thanks for the detailed response.

> Ideally, I could force certain ResultPartitions to only be manually
> releasable, so I can consume them over and over.
>
> How would you like to have this controlled by the user? Offer a API
> operation like `.pin()`? Do you need it pinned permanently until you
> release it or would it be ok to just cache it and maybe recompute if
> another task needs more memory/disk space?
>

If another task needs resources, I would expect cached data to be moved to
lower levels of the storage hierarchy. We don't want to block/degrade new
computations, because it is impossible to tell the user upfront that their
new request will exceed the available resources.

I think extending the approach of Tachyon might be interesting.  Currently,
they have up to 3 tiers: memory, local disk, durable "understore" (hdfs,
s3, etc). One could add a fourth possibility, call it "null" which would
mean, just drop data and recompute if needed. Then, one could specify a
policy with a list like [memory, null] or [memory, local disk, null] and
have pretty reasonable control.

However its done, I hope its in the nice compositional, interface-driven
style that Flink enjoys today. "Simple" is much better than "easy" for me.
There's a bunch of questions, and having well-though-out architecture to
achieve different behaviors seems like the most valuable thing to me vs any
specific top-level API approach.

One issue/question:

How would the user refer to an existing dataset in subsequent operations?
When you create a dataset at the top-level API, is it automatically
assigned an ID?

The separation of logical & physical plan is a Good Thing. When one is
submitting additional plans, one needs to refer to previous nodes. Whether
nodes are assigned identity by value versus by reference makes a
difference.

>
> To get the data you need a SingleInputGate and set it up with
> RemoteChannel instances for each consumed subpartition. This is where you
> need to know all the partition IDs and task managers (The connection ID is
> a wrapper around InetSocketAddress with a connection index for connection
> sharing).


> When you have the gate setup, you can query it with the RecordReader. The
> input gate itself just returns byte buffers.
>

This part is fairly well described in the docs, and I was able to more or
less figure it out. Cool stuff.


> Do you have an idea about how you want to figure out the partition and
> connection IDs in your repl process? If yes, I could provide some more
> concrete code snippets on the setup.
>

For hacking purposes my plan was to manually instantiate the execution
graph and instrument some of the nodes so I can get information out. My
REPL would run in the same process as the JobManager.

For a "real" solution, the REPL needs seem related to the WebUI, which I
haven't studied yet. One would want a fairly detailed view into the running
execution graph, possibly but not necessarily as an HTTP api.

I haven't studied how the execution graph is instantiated yet, again I'd
rather inject this logic via composition than have it hard-coded into the
existing implementation. Will have to study more.

Thanks for the pointers!




>
> ---
>
> If would like to contribute to Flink, we can also think about splitting
> this up into some smaller tasks and address them. Your ideas are definitely
> in line with what we wanted to have in Flink anyways. The recent focus on
> the streaming part of the system has pulled most contributors away from the
> batch parts though. I would suggest to also look at the changes in #640.
> Although the PR is rather old, the network stack has not seen many changes
> in recent times.
>
> Feel free to post further questions! :)
>
> – Ufuk
>
>

Reply via email to