> On 14 Jan 2016, at 22:00, kovas boguta <kovas.bog...@gmail.com> wrote:
> 
> On Thu, Jan 14, 2016 at 5:52 AM, Ufuk Celebi <u...@apache.org> wrote:
> Hey Kovas
> 
> sorry for the long delay.
> 
> It was worth the wait! Thanks for the detailed response. 
> 
> > Ideally, I could force certain ResultPartitions to only be manually 
> > releasable, so I can consume them over and over.
> 
> How would you like to have this controlled by the user? Offer a API operation 
> like `.pin()`? Do you need it pinned permanently until you release it or 
> would it be ok to just cache it and maybe recompute if another task needs 
> more memory/disk space?
> 
> If another task needs resources, I would expect cached data to be moved to 
> lower levels of the storage hierarchy. We don't want to block/degrade new 
> computations, because it is impossible to tell the user upfront that their 
> new request will exceed the available resources. 
> 
> I think extending the approach of Tachyon might be interesting.  Currently, 
> they have up to 3 tiers: memory, local disk, durable "understore" (hdfs, s3, 
> etc). One could add a fourth possibility, call it "null" which would mean, 
> just drop data and recompute if needed. Then, one could specify a policy with 
> a list like [memory, null] or [memory, local disk, null] and have pretty 
> reasonable control. 

This is in line with the plans the community has for the intermediate results 
in the future. :)

> However its done, I hope its in the nice compositional, interface-driven 
> style that Flink enjoys today. "Simple" is much better than "easy" for me. 
> There's a bunch of questions, and having well-though-out architecture to 
> achieve different behaviors seems like the most valuable thing to me vs any 
> specific top-level API approach. 
> 
> One issue/question: 
> 
> How would the user refer to an existing dataset in subsequent operations? 
> When you create a dataset at the top-level API, is it automatically assigned 
> an ID?

All IDs and configuration is assigned in the JobGraph structure. It is a 
non-parallel representation of the data flow (of the execution graph).

> The separation of logical & physical plan is a Good Thing. When one is 
> submitting additional plans, one needs to refer to previous nodes. Whether 
> nodes are assigned identity by value versus by reference makes a difference. 

The new graph simply refers to the logical ID and the job manager attaches it 
to the old graph. The connection happens by specifying the ID of a previous 
result as input to a job vertex.

> To get the data you need a SingleInputGate and set it up with RemoteChannel 
> instances for each consumed subpartition. This is where you need to know all 
> the partition IDs and task managers (The connection ID is a wrapper around 
> InetSocketAddress with a connection index for connection sharing). 
> 
> When you have the gate setup, you can query it with the RecordReader. The 
> input gate itself just returns byte buffers.
> 
> This part is fairly well described in the docs, and I was able to more or 
> less figure it out. Cool stuff. 

In general, for which parts would you like to see more documentation?

> Do you have an idea about how you want to figure out the partition and 
> connection IDs in your repl process? If yes, I could provide some more 
> concrete code snippets on the setup.
> 
> For hacking purposes my plan was to manually instantiate the execution graph 
> and instrument some of the nodes so I can get information out. My REPL would 
> run in the same process as the JobManager. 
> 
> For a "real" solution, the REPL needs seem related to the WebUI, which I 
> haven't studied yet. One would want a fairly detailed view into the running 
> execution graph, possibly but not necessarily as an HTTP api. 
> 
> I haven't studied how the execution graph is instantiated yet, again I'd 
> rather inject this logic via composition than have it hard-coded into the 
> existing implementation. Will have to study more. 

OK. In any case, feel free to post more questions here.

– Ufuk

Reply via email to