Hi, Elias, Thanks a lot to put up the patch for the simple job running in Kubernetes! As Kartik mentioned, that is well aligned w/ our goal to make Samza job launching easier. I am glad that we actually share a lot of common ideas from independent minds. Let me try to give my opinions on this: 1. My view on the role of cluster management systems (i.e. YARN, Mesos, Docker, Kubernetes, etc.): the cluster management system should be helping in the following aspects: a. help to allocate/de-allocate "physical" resource to run the standalone SamzaContainer processes i. this *must* help in initial allocation of physical resources (i.e. YARN, Mesos container, Docker, etc) ii. this *must* help in re-allocating the physical resource when trying to re-launch the failed standalone SamzaContainer process iii. this *should* help in dynamically allocate/de-allocate physical resources to re-size the Samza job b. help to monitor the liveness of the standalone SamzaContainer processes c. help to re-launch the failed standalone SamzaContainer processes within a reasonable limits. That should be done before the re-balancing-like behavior in SAMZA-516 is triggered. 2. The following functions should be in Samza itself, not relying on the underlying cluster management systems: a. Partition assignment: this would include the logic of grouping input topic partitions in a task, and assign tasks to each SamzaContainer processes. It should be pluggable/configurable to allow static assignment w/o coordination (e.g. SAMZA-41), or some coordination implementation (e.g. SAMZA-516) b. Decision on re-sizing the job: this would include the logic Kartik and you have mentioned as item (2), that Samza itself needs to determine to reduce/increase the number of SamzaContainer processes needed in a job (e.g. rebalancing as in SAMZA-516 is part of this). This would require i. An abstract interface to the cluster management system to request physical resource and launch additional SamzaContainer processes; and shutdown some SamzaContainer processes and release some physical resources. ii. Arguably, this would require coordination to elect a leader (JobCoordinator) for decision making per Samza job. c. More advanced features of container launching that depends on application state (e.g. Samza application state store). One example is host-affinity that allows to match each SamzaContainer to the physical resource for potential state-reuse. I would argue that most of these features would require JobCoordinator as well.
And some of my cents on the technical details below: On Tue, Dec 1, 2015 at 9:14 AM, Elias Levy <fearsome.lucid...@gmail.com> wrote: > That said, yes, I do believe dynamic assignment should be implemented. Not > because of fault-tolerance, which I think is the domain of whatever > container execution system is used to implement the job, but to enable > scaling up and down of a job or the dynamic addition and removal of > partitions without having to restart the job. > Totally agreed. > One aspect of the StandaloneJob proposal that was somewhat off-putting was > that the rebalancing was performed by shifting SamzaContainers around > rather than shifting partitions among containers. That adds an extra layer > of conceptual complexity. I understand that from an implementation > perspective the easiest thing to do is to shift SamzaContainers around, as > they appear to be immutable after the job starts and changing that may > require a lot of work, but conceptually it seems to make more sense to > shift partitions among SamzaContainers and execute a single SamzaContainer > per "container" (e.g. YARN container, Docker container, etc). Then again, > that is an implementation detail that would be largely hidden from the job > developer, although it would be exposed to the job admin, as the number of > SamzaContainers would become his parallelism limit. > > Yes, I agree w/ the reasoning here. As you said, there are some implementation details involved here. But it should not matter to the users. > > > > I submit that there is also a need for a simpler type of job, lets call > > it > > > SimpleJob. This would be a job where, at least until dynamic > > configuration > > > is implemented, the JobCoordinator is executed once a priori to > generate > > a > > > JobModel, the user can then configure the JobModel in the containers' > > > environment, and execute the containers as he sees fit. > Yes. I agree w/ the idea that we probably need to support this model as well. Just one technical issue: passing JobModel via container's environment may cause the same issue that motivated us to implement the CoordinatorStream in 0.10 (e.g. SAMZA-333). One potential solution is that each SamzaContainer also reads from the CoordinatorStream and builds up the JobModel locally during bootstrap, which eliminates the need of a single JobCoordinator's HTTP API executed a priori. More details can be discussed on this topic later. > > > > > > Failure handling and container monitoring is then left as something to > be > > > handled by whatever system the user chooses to execute the containers. > > > > > > This model can then be used with many of the existing container > > > orchestration systems, whether it be Kubernetes, Docker Swarm, Amazon > EC2 > > > Container Service, CoreOS Fleet, Helios, Marathon, Nomad, etc. > > > > > > In fact, KubernetesJob is essentially this SimpleJob proposal, except > > that > > > it outputs a Kubernetes config file instead of only the JobModel. > > > > Totally agree the above points. Thanks a lot! -Yi