Hi Debraj,

Kafka doesn't support reducing the partition size and only supports
increasing the partition size of a topic.
One way to accomplish it would be to create a new topic with the desired
partition count and reroute data from the old topic. Although, it will be
good to first understand the use case behind your request. Can you
shed some light on this?

In the event of change to input topic partition count, the implications to
a Samza job are as follows

   1. For stateless jobs, the job is shutdown and if you are running in a
   cluster mode (YARN), typically containers get restarted and pick up the
   change. In case of Standalone, a new rebalance is triggered.
   2. For stateful jobs, the shutdown behavior is the same. However, based
   on the choice of the grouper, it might result in additional tasks or
   reduced number of tasks which would invalidate some of the state associated
   with the tasks. If you have changelog enabled, you might need to recreate
   the changelog topic otherwise, you might run into validation issues or
   correctness issues with your application.

As of how Samza detects partition count changes[1] and the actions it takes
can be found here[2]

Thanks,
Bharath


[1] -
https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
[2] -
https://github.com/apache/samza/blob/beb5e1b40c07c092bc6e14aafc131d96eda5fcd4/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java#L370



On Mon, Jan 6, 2020 at 4:31 AM Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> Anyone any thoughts on this?
>
> On Sat, Jan 4, 2020 at 5:16 PM Debraj Manna <subharaj.ma...@gmail.com>
> wrote:
>
> > I am using samza on yarn with Kafka. I need to reduce the number of
> > partitions in kafka. I am ok with some data loss. Can someone suggest
> what
> > should be the recommended way of doing this?
> >
> > Samza Job Config looks like this -
> >
> > job.factory.class = org.apache.samza.job.yarn.YarnJobFactory
> > task.class = com.vnera.grid.task.GenericStreamTask
> > task.window.ms = 100
> > systems.kafka.samza.factory =
> > org.apache.samza.system.kafka.KafkaSystemFactory
> > systems.kafka.consumer.zookeeper.connect = localhost:2181
> > systems.kafka.consumer.auto.offset.reset = largest
> > systems.kafka.producer.metadata.broker.list = localhost:9092
> > systems.kafka.producer.producer.type = sync
> > systems.kafka.producer.batch.num.messages = 1
> > systems.kafka.samza.key.serde = string
> > serializers.registry.string.class =
> > org.apache.samza.serializers.StringSerdeFactory
> > yarn.package.path =
> > file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
> > yarn.container.count = ${container.count}
> > yarn.am.container.memory.mb =  ${samzajobs.memory.mb}
> > job.name = job4
> > task.inputs = kafka.Topic3
> >
>

Reply via email to