Hi Debraj, Kafka doesn't support reducing the partition size and only supports increasing the partition size of a topic. One way to accomplish it would be to create a new topic with the desired partition count and reroute data from the old topic. Although, it will be good to first understand the use case behind your request. Can you shed some light on this?
In the event of change to input topic partition count, the implications to a Samza job are as follows 1. For stateless jobs, the job is shutdown and if you are running in a cluster mode (YARN), typically containers get restarted and pick up the change. In case of Standalone, a new rebalance is triggered. 2. For stateful jobs, the shutdown behavior is the same. However, based on the choice of the grouper, it might result in additional tasks or reduced number of tasks which would invalidate some of the state associated with the tasks. If you have changelog enabled, you might need to recreate the changelog topic otherwise, you might run into validation issues or correctness issues with your application. As of how Samza detects partition count changes[1] and the actions it takes can be found here[2] Thanks, Bharath [1] - https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java [2] - https://github.com/apache/samza/blob/beb5e1b40c07c092bc6e14aafc131d96eda5fcd4/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java#L370 On Mon, Jan 6, 2020 at 4:31 AM Debraj Manna <subharaj.ma...@gmail.com> wrote: > Anyone any thoughts on this? > > On Sat, Jan 4, 2020 at 5:16 PM Debraj Manna <subharaj.ma...@gmail.com> > wrote: > > > I am using samza on yarn with Kafka. I need to reduce the number of > > partitions in kafka. I am ok with some data loss. Can someone suggest > what > > should be the recommended way of doing this? > > > > Samza Job Config looks like this - > > > > job.factory.class = org.apache.samza.job.yarn.YarnJobFactory > > task.class = com.vnera.grid.task.GenericStreamTask > > task.window.ms = 100 > > systems.kafka.samza.factory = > > org.apache.samza.system.kafka.KafkaSystemFactory > > systems.kafka.consumer.zookeeper.connect = localhost:2181 > > systems.kafka.consumer.auto.offset.reset = largest > > systems.kafka.producer.metadata.broker.list = localhost:9092 > > systems.kafka.producer.producer.type = sync > > systems.kafka.producer.batch.num.messages = 1 > > systems.kafka.samza.key.serde = string > > serializers.registry.string.class = > > org.apache.samza.serializers.StringSerdeFactory > > yarn.package.path = > > file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz > > yarn.container.count = ${container.count} > > yarn.am.container.memory.mb = ${samzajobs.memory.mb} > > job.name = job4 > > task.inputs = kafka.Topic3 > > >