[ 
https://issues.apache.org/jira/browse/KAFKA-13398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434074#comment-17434074
 ] 

RivenSun edited comment on KAFKA-13398 at 10/26/21, 2:51 AM:
-------------------------------------------------------------

[~showuon] Thanks for your reply.

Thank you for the explanation of the two methods *alterPartitionReassignments* 
and *alterConsumerGroupOffsets*.

In fact, the Kafka-clients version I currently use is *2.2.2*
 I have two requirements, reset the offset for a group, support multiple 
strategies; execute reassignment for a topic.

1. I simulated the execution of kafka-consumer-groups.sh, the *resetParam* 
parameter supports a variety of legal strategies, "*-to-latest*" or 
"*-to-earliest*" or other

 
{code:java}
List<String> resetArgs = new ArrayList<>();
 resetArgs.add("--bootstrap-server");
 resetArgs.add(endPoint);
 resetArgs.add("--group");
 resetArgs.add(group);
 resetArgs.add("--topic");
 resetArgs.add(topicUrl);
 resetArgs.add("--command-config");
 resetArgs.add(propertiesFile.getAbsolutePath());
 resetArgs.add("--timeout");
 resetArgs.add("30000");
 resetArgs.add("--reset-offsets");
 resetArgs.add(resetParam);
 resetArgs.add("--execute");
ConsumerGroupCommand.main(resetArgs.toArray(new String[0]));
{code}
I'm really lazy here. The two operations of *prepareOffsetsToReset* and 
*getConsumer.commitSync(preparedOffsets.asJava)* at the bottom code of 
*ConsumerGroupCommand* can indeed be replaced by other methods, but it is far 
from the convenience of just using ConsumerGroupCommand.main(...) directly.

2. I simulated the execution of kafka-reassign-partitions.sh,
 1) First, I referred to the allocation algorithm of partition replicas when 
Kafka created the topic, and generated the reassignment I wanted by myself, 
instead of using "*-generate*" to generate
 2) In fact, in version 2.2.2, ReassignPartitionsCommand requires zookeeper to 
be a required parameter
{code:java}
List<String> reassignArgs = new ArrayList<>();
 reassignArgs.add("--reassignment-json-file");
 reassignArgs.add(kafkaTopicTempJsonFile.getAbsolutePath());
 if (StringUtil.hasValue(zookeeperConnectAddress)) {
 reassignArgs.add("--zookeeper");
 reassignArgs.add(zookeeperConnectAddress);
} else{
 ...
 reassignArgs.add("--bootstrap-server");
 reassignArgs.add(endPoint);
 reassignArgs.add("--command-config");
 reassignArgs.add(propertiesFile.getAbsolutePath());
 }
 reassignArgs.add("--execute");
ReassignPartitionsCommand.main(reassignArgs.toArray(new String[0]));
{code}
The bottom layer finally achieves this function through *KafkaZkClient* and 
*AdminZkClient*. At present, I can't find other alternatives.


was (Author: rivensun):
[~showuon] Thanks for your reply.

Thank you for the explanation of the two methods *alterPartitionReassignments* 
and *alterConsumerGroupOffsets*.

In fact, the Kafka-clients version I currently use is *2.2.2*
I have two requirements, reset the offset for a group, support multiple 
strategies; execute reassignment for a topic.

1. I simulated the execution of kafka-consumer-groups.sh, the *resetParam* 
parameter supports a variety of legal strategies, "*-to-latest*" or 
"*-to-earliest*" or other

 
{code:java}
List<String> resetArgs = new ArrayList<>();
 resetArgs.add("--bootstrap-server");
 resetArgs.add(endPoint);
 resetArgs.add("--group");
 resetArgs.add(group);
 resetArgs.add("--topic");
 resetArgs.add(topicUrl);
 resetArgs.add("--command-config");
 resetArgs.add(propertiesFile.getAbsolutePath());
 resetArgs.add("--timeout");
 resetArgs.add("30000");
 resetArgs.add("--reset-offsets");
 resetArgs.add(resetParam);
 resetArgs.add("--execute");
ConsumerGroupCommand.main(resetArgs.toArray(new String[0]));
{code}

I'm really lazy here. The two operations of *prepareOffsetsToReset* and 
*getConsumer.commitSync(preparedOffsets.asJava)* at the bottom code of 
*ConsumerGroupCommand* can indeed be replaced by other methods, but it is far 
from the convenience of just using ConsumerGroupCommand.main(...) directly.

2. I simulated the execution of kafka-reassign-partitions.sh,
1) First, I referred to the allocation algorithm of partition replicas when 
Kafka created the topic, and generated the reassignment I wanted by myself, 
instead of using "*-generate*" to generate
2)
{code:java}
List<String> reassignArgs = new ArrayList<>();
 reassignArgs.add("--reassignment-json-file");
 reassignArgs.add(kafkaTopicTempJsonFile.getAbsolutePath());
 if (StringUtil.hasValue(zookeeperConnectAddress)) {
 reassignArgs.add("--zookeeper");
 reassignArgs.add(zookeeperConnectAddress);
} else{
 ...
 reassignArgs.add("--bootstrap-server");
 reassignArgs.add(endPoint);
 reassignArgs.add("--command-config");
 reassignArgs.add(propertiesFile.getAbsolutePath());
 }
 reassignArgs.add("--execute");
ReassignPartitionsCommand.main(reassignArgs.toArray(new String[0]));
{code}

The bottom layer finally achieves this function through *KafkaZkClient* and 
*AdminZkClient*. At present, I can't find other alternatives.

> The caller program will be shut down directly when the execution of Kafka 
> script is abnormal
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13398
>             Project: Kafka
>          Issue Type: Improvement
>          Components: admin
>    Affects Versions: 3.0.0
>            Reporter: RivenSun
>            Priority: Major
>
> hello [~showuon] and [~guozhang]
> Kafka has some key functions that have not yet been integrated into 
> Java-AdminClient, so I have to use some Scala classes in the Kafka Server 
> `kafka.admin` package in my java program, such as: 
> `ReassignPartitionsCommand`, `ConsumerGroupCommand` (reset group offsets),  
> and etc., to call their `*main(args: Array[String])*` methods in order to 
> achieve specific functions.
> *Problem*:
> 1. In different Kafka versions, these Scala classes may have different 
> requirements for input parameters, or they may have different treatments for 
> the results of command execution.
>  1) `ReassignPartitionsCommand` requires  --bootstrap-server is required in 
> the latest high version,
> but requires --zookeeper in the low version.
>  Once the parameter verification fails, the *Exit.exit(1, Some(message))* 
> method will be called, which will cause my process to shut down directly.
>  2) In Kafka 3.0.0 version, there is this code at the end in the `*main(args: 
> Array[String])*` method of `ReassignPartitionsCommand`
> {code:java}
> // If the command failed, exit with a non-zero exit code.
>  if (failed) {
>  Exit.exit(1)
>  }{code}
> This will also make my process shut down directly
> So I hope that the Kafka community will be able to print out the reason and 
> stack of the corresponding exception when the parameter verification fails or 
> the execution command is abnormal, and then return from the `*main(args: 
> Array[String])*` method of the command, but don't call `*Exit.exit(...)*` 
> method. Of course, when the script is executed on the machine, there is no 
> problem with exiting directly.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to