Hey Kovas sorry for the long delay.
> On 10 Jan 2016, at 06:20, kovas boguta <kovas.bog...@gmail.com> wrote: > 1) How can I prevent ResultPartitions from being released? > > In interactive use, RPs should not necessarily be released when there are no > pending tasks to consume them. Max Michels did some work along those lines in a by now very out dated pull request: https://github.com/apache/flink/pull/640 > Looking at the code, its hard to understand the high-level logic of who > triggers their release, how the refcounting works, etc. For instance, is > releasePartitionsProducedBy called by the producer, or the consumer, or both? > Is the refcount initialized at the beginning of the task setup, and > decremented every time its read out? I fully agree that the respective part of the system is very much under documented. Sorry about that. - ResultPartitionManager: the result partition manager keeps track of all partitions of a task manager. Each task registers the produced partitions when it is instantiated (see Task constructor and NetworkEnvironment#registerTask). This is the final truth about which partitions are available etc. - releasePartitionsProducedBy: This is not part of the regular life cycle of the results, but only triggered by the job manager to get rid of old results in case of cancellation/failure. - Ref counts: The release of the results during normal operation happens via the ref counts. Currently, the ref counts are always initialised to the number of sub partitions (A result partition consists of 1 or more sub partitions for each parallel receiver of the result). Decrementing happens when a sub partition has been fully consumed (via ResultPartition#onConsumedSubpartition). And as soon as the ref count reaches 0, they are released. The final release happens in the ResultPartitionManager. The behaviour that would need to change is the last step in the result partition manager imo. I think #640 has some modifications in that respect, which might be helpful in figuring out the details. I think what can happen instead of the final release is that a result becomes “cached” and stay around. > Ideally, I could force certain ResultPartitions to only be manually > releasable, so I can consume them over and over. How would you like to have this controlled by the user? Offer a API operation like `.pin()`? Do you need it pinned permanently until you release it or would it be ok to just cache it and maybe recompute if another task needs more memory/disk space? > 2) Can I call attachJobGraph on the ExecutionGraph after the job has begun > executing to add more nodes? > > I read that Flink does not support changing the running the topology. But > what about extending the topology to add more nodes? I think that should be possible. The problem at the moment is that this will recompute everything from the sources. Your suggestion makes sense and actually this was one of the motivations for #640. The newly attached nodes would back track to the produced results of the initial topology and go on from there. > If the IntermediateResultPartition are just sitting around from previously > completely tasks, this seems straightforward in principle.. would one have to > fiddle with the scheduler/event system to kick things off again? I think it will be possible to just submit the new parts. But I’m not sure about the details. > 3) Can I have a ConnectionManager without a TaskManager? > > For interactive use, I want to selectively pull data from ResultPartitions > into my local REPL process. But I don't want my local process to be a > TaskManager node, and have tasks assigned to it. > > So this question boils down to, how to hook a ConnectionManager into the > Flink communication/actor system? In theory it should be possible, yes. In practice I see problems with setting up all the parameters. You have to instantiate a NettyConnectionManager. For the start method, only the NetworkBufferPool is relevant, which is easy to instantiate as well. This will take part of the network transfers. To get the data you need a SingleInputGate and set it up with RemoteChannel instances for each consumed subpartition. This is where you need to know all the partition IDs and task managers (The connection ID is a wrapper around InetSocketAddress with a connection index for connection sharing). When you have the gate setup, you can query it with the RecordReader. The input gate itself just returns byte buffers. Do you have an idea about how you want to figure out the partition and connection IDs in your repl process? If yes, I could provide some more concrete code snippets on the setup. --- If would like to contribute to Flink, we can also think about splitting this up into some smaller tasks and address them. Your ideas are definitely in line with what we wanted to have in Flink anyways. The recent focus on the streaming part of the system has pulled most contributors away from the batch parts though. I would suggest to also look at the changes in #640. Although the PR is rather old, the network stack has not seen many changes in recent times. Feel free to post further questions! :) – Ufuk