On Tue, Jan 19, 2016 at 11:37 AM, Camelia Elena Ciolac <came...@chalmers.se> wrote:
> Hello, > > I list some questions gathered while reading documentation on Flink's > internals and I am grateful to receive your answers. > > 1) How is the JobManager involved in the communication between tasks > running in task slots on TaskManagers? > > From [1] it appears to me that, as part of the control flow for data > exchange, every time a task has a a result that is consumable, it notifies > the JobManager > This is repeated in [2] "When producing an intermediate result, the > producing task is responsible to notify the JobManager about available > data". > However, the result's size might as well be 1, in the case of data > streams, so I ask this question to understand better if for each data > exchange the JobManager is involved. > The JobManager is involved when the result is consumable. Depending on the result characteristic this is either the case when the first record/buffer has been produced (pipelined results) or when all records/buffers have been produced (blocking results). The notification can either kick of the deployment of the consuming tasks or update them with more specific information about the location of the to be consumed results. The notification does not happen for each buffer that is transferred. The data exchange happens from task manager to task manager via a different TCP connection. > 2) In case my understanding of the aforementioned control flow is correct, > then from the following options, which is the data outcome that triggers a > notification of the JobManager: a ResultPartition, a ResultSubpartition or > a Buffer? > The ResultPartition having at least one (pipelined) or all (blocking) buffers produced. The sub partitions contain all data for a parallel sub task consumer (for example reducer sub task 1 out of a total of 4 reducer sub tasks). 3) Idem, then are the Akka actors involved in this notification of data > availability for consumption? > Yes, all distributed coordinations happens via Akka. The actual data exchange happens via a custom TCP stack with Netty. As an example: the Akka messages contain information about which result to request from which task manager and the request and data transfer happens via TCP. > 4) How is orchestrated this co-existence of receiver-initiated data > transfers (the consumer requests the data partitions) with the push data > transfers [2] between 2 tasks ? > The JobManager acts as the coordinator of the system and holds all required information in the ExecutionGraph data structure. The execution graph is an asynchronous state machine used for scheduling and tracking the progress of deployed tasks. During deployment, the tasks either know where to request data from or they get updates during runtime if the result location is not known at deployment time. > From [3] I retain the paragraph: > > "The JobManager also acts as the input split assigner for data sources. It > is responsible for distributing the work across all TaskManager such that > data locality is preserved where possible. In order to dynamically balance > the load, the Tasks request a new input split after they have finished > processing the old one. This request is realized by sending a > RequestNextInputSplit to the JobManager. The JobManager responds with a > NextInputSplit message. If there are no more input splits, then the input > split contained in the message is null. > > The Tasks are deployed lazily to the TaskManagers. This means that tasks > which consume data are only deployed after one of its producers has > finished producing some data. Once the producer has done so, it sends a > ScheduleOrUpdateConsumers message to the JobManager. This messages says > that the consumer can now read the newly produced data. If the consuming > task is not yet running, it will be deployed to a TaskManager." > > 5) Can you please summarize how this data exchange occurs in Flink, based > on an example? > WordCount with parallelism 2: [ Source -> Map 0 ] |-> ResultPartition 0 with 2 sub partitions <--+--- [ Reduce 0 -> Sink 0 ] [ Source -> Map 1 ] |-> ResultPartition 1 with 2 sub partitions <--+--- [ Reduce 1 -> Sink 1 ] Assume that Result 0 is pipelined. At runtime, result 0 is made up from of two ResultPartitions (for each parallel Source->Map pipeline) with two sub partitions (for each consuming Reduce->Sink pipeline). The JobManager schedules the source tasks (which are chained with the mappers). As soon as the Source->Map pipeline produces the first data, the job manager will receive an Akka message to deploy the consuming tasks (it will receive multiple of these, of which one will be the first). Because Result 0 is consumed in an all-to-all fashion, this triggers the deployment of both reducers. At this point the JobManager tries to schedule the reducers, which then request their respective sub partitions from both ResultPartition 0 and 1, e.g. [Reduce 0->Sink 0] request sub partition 0 of ResultPartition 0 and 1 and [Reduce 1->Sink 1] requests subpartition 1 of ResultPartition 0 and 1. This is the general flow of things (missing some detail regarding runtime updates of the consumers). For blocking results, the notifications happen when all parallel Source->Map pipelines have produced their data. > Thank you! > Does this help? Of course, feel free to ask further questions. – Ufuk