On Thu, Jan 14, 2016 at 5:52 AM, Ufuk Celebi <u...@apache.org> wrote:
> Hey Kovas > > sorry for the long delay. > It was worth the wait! Thanks for the detailed response. > Ideally, I could force certain ResultPartitions to only be manually > releasable, so I can consume them over and over. > > How would you like to have this controlled by the user? Offer a API > operation like `.pin()`? Do you need it pinned permanently until you > release it or would it be ok to just cache it and maybe recompute if > another task needs more memory/disk space? > If another task needs resources, I would expect cached data to be moved to lower levels of the storage hierarchy. We don't want to block/degrade new computations, because it is impossible to tell the user upfront that their new request will exceed the available resources. I think extending the approach of Tachyon might be interesting. Currently, they have up to 3 tiers: memory, local disk, durable "understore" (hdfs, s3, etc). One could add a fourth possibility, call it "null" which would mean, just drop data and recompute if needed. Then, one could specify a policy with a list like [memory, null] or [memory, local disk, null] and have pretty reasonable control. However its done, I hope its in the nice compositional, interface-driven style that Flink enjoys today. "Simple" is much better than "easy" for me. There's a bunch of questions, and having well-though-out architecture to achieve different behaviors seems like the most valuable thing to me vs any specific top-level API approach. One issue/question: How would the user refer to an existing dataset in subsequent operations? When you create a dataset at the top-level API, is it automatically assigned an ID? The separation of logical & physical plan is a Good Thing. When one is submitting additional plans, one needs to refer to previous nodes. Whether nodes are assigned identity by value versus by reference makes a difference. > > To get the data you need a SingleInputGate and set it up with > RemoteChannel instances for each consumed subpartition. This is where you > need to know all the partition IDs and task managers (The connection ID is > a wrapper around InetSocketAddress with a connection index for connection > sharing). > When you have the gate setup, you can query it with the RecordReader. The > input gate itself just returns byte buffers. > This part is fairly well described in the docs, and I was able to more or less figure it out. Cool stuff. > Do you have an idea about how you want to figure out the partition and > connection IDs in your repl process? If yes, I could provide some more > concrete code snippets on the setup. > For hacking purposes my plan was to manually instantiate the execution graph and instrument some of the nodes so I can get information out. My REPL would run in the same process as the JobManager. For a "real" solution, the REPL needs seem related to the WebUI, which I haven't studied yet. One would want a fairly detailed view into the running execution graph, possibly but not necessarily as an HTTP api. I haven't studied how the execution graph is instantiated yet, again I'd rather inject this logic via composition than have it hard-coded into the existing implementation. Will have to study more. Thanks for the pointers! > > --- > > If would like to contribute to Flink, we can also think about splitting > this up into some smaller tasks and address them. Your ideas are definitely > in line with what we wanted to have in Flink anyways. The recent focus on > the streaming part of the system has pulled most contributors away from the > batch parts though. I would suggest to also look at the changes in #640. > Although the PR is rather old, the network stack has not seen many changes > in recent times. > > Feel free to post further questions! :) > > – Ufuk > >