> On 14 Jan 2016, at 22:00, kovas boguta <kovas.bog...@gmail.com> wrote: > > On Thu, Jan 14, 2016 at 5:52 AM, Ufuk Celebi <u...@apache.org> wrote: > Hey Kovas > > sorry for the long delay. > > It was worth the wait! Thanks for the detailed response. > > > Ideally, I could force certain ResultPartitions to only be manually > > releasable, so I can consume them over and over. > > How would you like to have this controlled by the user? Offer a API operation > like `.pin()`? Do you need it pinned permanently until you release it or > would it be ok to just cache it and maybe recompute if another task needs > more memory/disk space? > > If another task needs resources, I would expect cached data to be moved to > lower levels of the storage hierarchy. We don't want to block/degrade new > computations, because it is impossible to tell the user upfront that their > new request will exceed the available resources. > > I think extending the approach of Tachyon might be interesting. Currently, > they have up to 3 tiers: memory, local disk, durable "understore" (hdfs, s3, > etc). One could add a fourth possibility, call it "null" which would mean, > just drop data and recompute if needed. Then, one could specify a policy with > a list like [memory, null] or [memory, local disk, null] and have pretty > reasonable control.
This is in line with the plans the community has for the intermediate results in the future. :) > However its done, I hope its in the nice compositional, interface-driven > style that Flink enjoys today. "Simple" is much better than "easy" for me. > There's a bunch of questions, and having well-though-out architecture to > achieve different behaviors seems like the most valuable thing to me vs any > specific top-level API approach. > > One issue/question: > > How would the user refer to an existing dataset in subsequent operations? > When you create a dataset at the top-level API, is it automatically assigned > an ID? All IDs and configuration is assigned in the JobGraph structure. It is a non-parallel representation of the data flow (of the execution graph). > The separation of logical & physical plan is a Good Thing. When one is > submitting additional plans, one needs to refer to previous nodes. Whether > nodes are assigned identity by value versus by reference makes a difference. The new graph simply refers to the logical ID and the job manager attaches it to the old graph. The connection happens by specifying the ID of a previous result as input to a job vertex. > To get the data you need a SingleInputGate and set it up with RemoteChannel > instances for each consumed subpartition. This is where you need to know all > the partition IDs and task managers (The connection ID is a wrapper around > InetSocketAddress with a connection index for connection sharing). > > When you have the gate setup, you can query it with the RecordReader. The > input gate itself just returns byte buffers. > > This part is fairly well described in the docs, and I was able to more or > less figure it out. Cool stuff. In general, for which parts would you like to see more documentation? > Do you have an idea about how you want to figure out the partition and > connection IDs in your repl process? If yes, I could provide some more > concrete code snippets on the setup. > > For hacking purposes my plan was to manually instantiate the execution graph > and instrument some of the nodes so I can get information out. My REPL would > run in the same process as the JobManager. > > For a "real" solution, the REPL needs seem related to the WebUI, which I > haven't studied yet. One would want a fairly detailed view into the running > execution graph, possibly but not necessarily as an HTTP api. > > I haven't studied how the execution graph is instantiated yet, again I'd > rather inject this logic via composition than have it hard-coded into the > existing implementation. Will have to study more. OK. In any case, feel free to post more questions here. – Ufuk