Our current main purpose of samza is for data pipeline, so we don't want to
create multiple tasks in the single SamzaContainer. As I read Samza
implementation, it will create as many tasks as the number of partitions
assigned in the container, right?

The problem of that approach is, each task will have a separate buffer,
which is not necessary in our use case. So, I wrote the following
SystemStreamPartitionGrouper

public class SingleTaskSystemStreamPartition implements
SystemStreamPartitionGrouper {
  @Override
  public Map<TaskName, Set<SystemStreamPartition>>
group(Set<SystemStreamPartition> ssps) {
    return new ImmutableMap.Builder<TaskName, Set<SystemStreamPartition>>()
                .put(new TaskName(ssps.iterator().next().getStream()), ssps)
                .build();
  }
}

The above worked with yarn.container.count=1 but if I increase that number,
containers couldn't start because they couldn't get assigned
SystemStreamPartition.

Could you guide me how to write SystemStreamPartitionGrouper to create a
single task per SamzaContainer?

Thank you
Best, Jae

Reply via email to