Hey Jae, You're correct. You'll need to write a custom SytsemStreamPartitionGrouper. You can use the config object in getSystemStreamPartitionGrouper to pull out your container count (yarn.container.count), and group the SSPs into that many tasks. This will guarantee you that you'll have exactly as many tasks as containers (i.e. one task per container).
Cheers, Chris On Fri, Feb 6, 2015 at 2:39 PM, Bae, Jae Hyeon <metac...@gmail.com> wrote: > I am reading JobCoordinator and now I can understand why multiple > containers were not launched. I need to create multiple tasks, which are > grouped again based on containerCount. > > On Fri, Feb 6, 2015 at 1:26 PM, Bae, Jae Hyeon <metac...@gmail.com> wrote: > > > Our current main purpose of samza is for data pipeline, so we don't want > > to create multiple tasks in the single SamzaContainer. As I read Samza > > implementation, it will create as many tasks as the number of partitions > > assigned in the container, right? > > > > The problem of that approach is, each task will have a separate buffer, > > which is not necessary in our use case. So, I wrote the following > > SystemStreamPartitionGrouper > > > > public class SingleTaskSystemStreamPartition implements > > SystemStreamPartitionGrouper { > > @Override > > public Map<TaskName, Set<SystemStreamPartition>> > > group(Set<SystemStreamPartition> ssps) { > > return new ImmutableMap.Builder<TaskName, > Set<SystemStreamPartition>>() > > .put(new TaskName(ssps.iterator().next().getStream()), > > ssps) > > .build(); > > } > > } > > > > The above worked with yarn.container.count=1 but if I increase that > > number, containers couldn't start because they couldn't get assigned > > SystemStreamPartition. > > > > Could you guide me how to write SystemStreamPartitionGrouper to create a > > single task per SamzaContainer? > > > > Thank you > > Best, Jae > > >