There are few ways to achieve data copy. 1. Use the vanilla Kafka consumer that consumes data from the old topic and produce to the new topic with fewer partitions. 2. Write a Samza job that reads from your old topic and funnels the data to the new topic.
I'd recommend you to follow up with Kafka community too if you are looking for more options. You typically don't have to delete the checkpoint and coordinator topic. The checkpoints of the new tasks should overwrite the old tasks. However, you might be left with some stale data since the new topic has fewer partitions and hence fewer tasks. Coordinator topic stores config for the most part and it is possible the topics have some stale topic configurations (if any). Hope that helps. Thanks, Bharath On Tue, Jan 7, 2020 at 6:19 AM Debraj Manna <subharaj.ma...@gmail.com> wrote: > Thanks Bharath for replying. > > Samza job is stateless and running in YARN cluster. > > If I follow the below approach. > > 1. Create a temp kafka topic > 2. Copy the messages from old topic to the new topic > 3. Delete old topic > 4. Create new topic with required partitions > 5. Delete old topic > 6. Copy messages from temp topic to new topic > > What I have do with the __samza_checkpoint and __samza_coordinator_ topics? > Should I also delete them? > > Can you explain what do you mean by reroute? > > On Mon, Jan 6, 2020 at 7:40 PM Bharath Kumara Subramanian < > codin.mart...@gmail.com> wrote: > > > Hi Debraj, > > > > Kafka doesn't support reducing the partition size and only supports > > increasing the partition size of a topic. > > One way to accomplish it would be to create a new topic with the desired > > partition count and reroute data from the old topic. Although, it will be > > good to first understand the use case behind your request. Can you > > shed some light on this? > > > > In the event of change to input topic partition count, the implications > to > > a Samza job are as follows > > > > 1. For stateless jobs, the job is shutdown and if you are running in a > > cluster mode (YARN), typically containers get restarted and pick up > the > > change. In case of Standalone, a new rebalance is triggered. > > 2. For stateful jobs, the shutdown behavior is the same. However, > based > > on the choice of the grouper, it might result in additional tasks or > > reduced number of tasks which would invalidate some of the state > > associated > > with the tasks. If you have changelog enabled, you might need to > > recreate > > the changelog topic otherwise, you might run into validation issues or > > correctness issues with your application. > > > > As of how Samza detects partition count changes[1] and the actions it > takes > > can be found here[2] > > > > Thanks, > > Bharath > > > > > > [1] - > > > > > https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java > > [2] - > > > > > https://github.com/apache/samza/blob/beb5e1b40c07c092bc6e14aafc131d96eda5fcd4/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java#L370 > > > > > > > > On Mon, Jan 6, 2020 at 4:31 AM Debraj Manna <subharaj.ma...@gmail.com> > > wrote: > > > > > Anyone any thoughts on this? > > > > > > On Sat, Jan 4, 2020 at 5:16 PM Debraj Manna <subharaj.ma...@gmail.com> > > > wrote: > > > > > > > I am using samza on yarn with Kafka. I need to reduce the number of > > > > partitions in kafka. I am ok with some data loss. Can someone suggest > > > what > > > > should be the recommended way of doing this? > > > > > > > > Samza Job Config looks like this - > > > > > > > > job.factory.class = org.apache.samza.job.yarn.YarnJobFactory > > > > task.class = com.vnera.grid.task.GenericStreamTask > > > > task.window.ms = 100 > > > > systems.kafka.samza.factory = > > > > org.apache.samza.system.kafka.KafkaSystemFactory > > > > systems.kafka.consumer.zookeeper.connect = localhost:2181 > > > > systems.kafka.consumer.auto.offset.reset = largest > > > > systems.kafka.producer.metadata.broker.list = localhost:9092 > > > > systems.kafka.producer.producer.type = sync > > > > systems.kafka.producer.batch.num.messages = 1 > > > > systems.kafka.samza.key.serde = string > > > > serializers.registry.string.class = > > > > org.apache.samza.serializers.StringSerdeFactory > > > > yarn.package.path = > > > > > > file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz > > > > yarn.container.count = ${container.count} > > > > yarn.am.container.memory.mb = ${samzajobs.memory.mb} > > > > job.name = job4 > > > > task.inputs = kafka.Topic3 > > > > > > > > > >