[ https://issues.apache.org/jira/browse/KAFKA-13398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434074#comment-17434074 ]
RivenSun edited comment on KAFKA-13398 at 10/26/21, 2:51 AM: ------------------------------------------------------------- [~showuon] Thanks for your reply. Thank you for the explanation of the two methods *alterPartitionReassignments* and *alterConsumerGroupOffsets*. In fact, the Kafka-clients version I currently use is *2.2.2* I have two requirements, reset the offset for a group, support multiple strategies; execute reassignment for a topic. 1. I simulated the execution of kafka-consumer-groups.sh, the *resetParam* parameter supports a variety of legal strategies, "*-to-latest*" or "*-to-earliest*" or other {code:java} List<String> resetArgs = new ArrayList<>(); resetArgs.add("--bootstrap-server"); resetArgs.add(endPoint); resetArgs.add("--group"); resetArgs.add(group); resetArgs.add("--topic"); resetArgs.add(topicUrl); resetArgs.add("--command-config"); resetArgs.add(propertiesFile.getAbsolutePath()); resetArgs.add("--timeout"); resetArgs.add("30000"); resetArgs.add("--reset-offsets"); resetArgs.add(resetParam); resetArgs.add("--execute"); ConsumerGroupCommand.main(resetArgs.toArray(new String[0])); {code} I'm really lazy here. The two operations of *prepareOffsetsToReset* and *getConsumer.commitSync(preparedOffsets.asJava)* at the bottom code of *ConsumerGroupCommand* can indeed be replaced by other methods, but it is far from the convenience of just using ConsumerGroupCommand.main(...) directly. 2. I simulated the execution of kafka-reassign-partitions.sh, 1) First, I referred to the allocation algorithm of partition replicas when Kafka created the topic, and generated the reassignment I wanted by myself, instead of using "*-generate*" to generate 2) In fact, in version 2.2.2, ReassignPartitionsCommand requires zookeeper to be a required parameter {code:java} List<String> reassignArgs = new ArrayList<>(); reassignArgs.add("--reassignment-json-file"); reassignArgs.add(kafkaTopicTempJsonFile.getAbsolutePath()); if (StringUtil.hasValue(zookeeperConnectAddress)) { reassignArgs.add("--zookeeper"); reassignArgs.add(zookeeperConnectAddress); } else{ ... reassignArgs.add("--bootstrap-server"); reassignArgs.add(endPoint); reassignArgs.add("--command-config"); reassignArgs.add(propertiesFile.getAbsolutePath()); } reassignArgs.add("--execute"); ReassignPartitionsCommand.main(reassignArgs.toArray(new String[0])); {code} The bottom layer finally achieves this function through *KafkaZkClient* and *AdminZkClient*. At present, I can't find other alternatives. was (Author: rivensun): [~showuon] Thanks for your reply. Thank you for the explanation of the two methods *alterPartitionReassignments* and *alterConsumerGroupOffsets*. In fact, the Kafka-clients version I currently use is *2.2.2* I have two requirements, reset the offset for a group, support multiple strategies; execute reassignment for a topic. 1. I simulated the execution of kafka-consumer-groups.sh, the *resetParam* parameter supports a variety of legal strategies, "*-to-latest*" or "*-to-earliest*" or other {code:java} List<String> resetArgs = new ArrayList<>(); resetArgs.add("--bootstrap-server"); resetArgs.add(endPoint); resetArgs.add("--group"); resetArgs.add(group); resetArgs.add("--topic"); resetArgs.add(topicUrl); resetArgs.add("--command-config"); resetArgs.add(propertiesFile.getAbsolutePath()); resetArgs.add("--timeout"); resetArgs.add("30000"); resetArgs.add("--reset-offsets"); resetArgs.add(resetParam); resetArgs.add("--execute"); ConsumerGroupCommand.main(resetArgs.toArray(new String[0])); {code} I'm really lazy here. The two operations of *prepareOffsetsToReset* and *getConsumer.commitSync(preparedOffsets.asJava)* at the bottom code of *ConsumerGroupCommand* can indeed be replaced by other methods, but it is far from the convenience of just using ConsumerGroupCommand.main(...) directly. 2. I simulated the execution of kafka-reassign-partitions.sh, 1) First, I referred to the allocation algorithm of partition replicas when Kafka created the topic, and generated the reassignment I wanted by myself, instead of using "*-generate*" to generate 2) {code:java} List<String> reassignArgs = new ArrayList<>(); reassignArgs.add("--reassignment-json-file"); reassignArgs.add(kafkaTopicTempJsonFile.getAbsolutePath()); if (StringUtil.hasValue(zookeeperConnectAddress)) { reassignArgs.add("--zookeeper"); reassignArgs.add(zookeeperConnectAddress); } else{ ... reassignArgs.add("--bootstrap-server"); reassignArgs.add(endPoint); reassignArgs.add("--command-config"); reassignArgs.add(propertiesFile.getAbsolutePath()); } reassignArgs.add("--execute"); ReassignPartitionsCommand.main(reassignArgs.toArray(new String[0])); {code} The bottom layer finally achieves this function through *KafkaZkClient* and *AdminZkClient*. At present, I can't find other alternatives. > The caller program will be shut down directly when the execution of Kafka > script is abnormal > -------------------------------------------------------------------------------------------- > > Key: KAFKA-13398 > URL: https://issues.apache.org/jira/browse/KAFKA-13398 > Project: Kafka > Issue Type: Improvement > Components: admin > Affects Versions: 3.0.0 > Reporter: RivenSun > Priority: Major > > hello [~showuon] and [~guozhang] > Kafka has some key functions that have not yet been integrated into > Java-AdminClient, so I have to use some Scala classes in the Kafka Server > `kafka.admin` package in my java program, such as: > `ReassignPartitionsCommand`, `ConsumerGroupCommand` (reset group offsets), > and etc., to call their `*main(args: Array[String])*` methods in order to > achieve specific functions. > *Problem*: > 1. In different Kafka versions, these Scala classes may have different > requirements for input parameters, or they may have different treatments for > the results of command execution. > 1) `ReassignPartitionsCommand` requires --bootstrap-server is required in > the latest high version, > but requires --zookeeper in the low version. > Once the parameter verification fails, the *Exit.exit(1, Some(message))* > method will be called, which will cause my process to shut down directly. > 2) In Kafka 3.0.0 version, there is this code at the end in the `*main(args: > Array[String])*` method of `ReassignPartitionsCommand` > {code:java} > // If the command failed, exit with a non-zero exit code. > if (failed) { > Exit.exit(1) > }{code} > This will also make my process shut down directly > So I hope that the Kafka community will be able to print out the reason and > stack of the corresponding exception when the parameter verification fails or > the execution command is abnormal, and then return from the `*main(args: > Array[String])*` method of the command, but don't call `*Exit.exit(...)*` > method. Of course, when the script is executed on the machine, there is no > problem with exiting directly. > -- This message was sent by Atlassian Jira (v8.3.4#803005)