[ https://issues.apache.org/jira/browse/KAFKA-5588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502787#comment-16502787 ]
ASF GitHub Bot commented on KAFKA-5588: --------------------------------------- ijuma closed pull request #5097: KAFKA-5588: Remove deprecated new-consumer option for tools URL: https://github.com/apache/kafka/pull/5097 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 689a63c2e4a..44d09fda5fa 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -889,8 +889,6 @@ object ConsumerGroupCommand extends Logging { "Pass in just a topic to delete the given topic's partition offsets and ownership information " + "for every consumer group. For instance --topic t1" + nl + "WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active." - val NewConsumerDoc = "Use the new consumer implementation. This is the default, so this option is deprecated and " + - "will be removed in a future release." val TimeoutMsDoc = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + "to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " + "or is going through some changes)." @@ -943,7 +941,6 @@ object ConsumerGroupCommand extends Logging { val listOpt = parser.accepts("list", ListDoc) val describeOpt = parser.accepts("describe", DescribeDoc) val deleteOpt = parser.accepts("delete", DeleteDoc) - val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc) val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc) .withRequiredArg .describedAs("timeout (ms)") @@ -1011,16 +1008,9 @@ object ConsumerGroupCommand extends Logging { if (useOldConsumer) { if (options.has(bootstrapServerOpt)) CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid with $zkConnectOpt.") - else if (options.has(newConsumerOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.") } else { CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) - if (options.has(newConsumerOpt)) { - Console.err.println(s"The --new-consumer option is deprecated and will be removed in a future major release. " + - s"The new consumer is used by default if the --bootstrap-server option is provided.") - } - if (options.has(deleteOpt) && options.has(topicOpt)) CommandLineUtils.printUsageAndDie(parser, s"When deleting a consumer group the option $topicOpt is only " + s"valid with $zkConnectOpt. The new consumer does not support topic-specific offset deletion from a consumer group.") diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index c1f8b81a066..b3103ebfa88 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -324,8 +324,6 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("metrics directory") .ofType(classOf[java.lang.String]) - val newConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation. This is the default, so " + - "this option is deprecated and will be removed in a future release.") val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED (unless old consumer is used): The server to connect to.") .withRequiredArg .describedAs("server to connect to") @@ -397,8 +395,6 @@ object ConsoleConsumer extends Logging { if (useOldConsumer) { if (options.has(bootstrapServerOpt)) CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid with $zkConnectOpt.") - else if (options.has(newConsumerOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.") val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) if (topicOrFilterOpt.size != 1) CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") @@ -449,11 +445,6 @@ object ConsoleConsumer extends Logging { if (!useOldConsumer) { CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) - - if (options.has(newConsumerOpt)) { - Console.err.println("The --new-consumer option is deprecated and will be removed in a future major release. " + - "The new consumer is used by default if the --bootstrap-server option is provided.") - } } if (options.has(csvMetricsReporterEnabledOpt)) { diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 4fff87745e7..f4221fe2946 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -296,8 +296,6 @@ object ConsumerPerformance extends LazyLogging { .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val newConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation. This is the default, so " + - "this option is deprecated and will be removed in a future release.") val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") .withRequiredArg .describedAs("config file") @@ -325,11 +323,6 @@ object ConsumerPerformance extends LazyLogging { if (!useOldConsumer) { CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServersOpt) - if (options.has(newConsumerOpt)) { - Console.err.println("The --new-consumer option is deprecated and will be removed in a future major release. " + - "The new consumer is used by default if the --broker-list option is provided.") - } - import org.apache.kafka.clients.consumer.ConsumerConfig props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt)) props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)) @@ -342,8 +335,7 @@ object ConsumerPerformance extends LazyLogging { } else { if (options.has(bootstrapServersOpt)) CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServersOpt is not valid with $zkConnectOpt.") - else if (options.has(newConsumerOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.") + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, numMessagesOpt) props.put("group.id", options.valueOf(groupIdOpt)) props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala index 4cc28372449..ef3b17c3938 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala @@ -16,6 +16,7 @@ */ package unit.kafka.admin +import joptsimple.OptionException import kafka.admin.ConsumerGroupCommandTest import kafka.utils.TestUtils import org.apache.kafka.common.protocol.Errors @@ -24,12 +25,11 @@ import org.junit.Test class DeleteConsumerGroupTest extends ConsumerGroupCommandTest { - @Test(expected = classOf[joptsimple.OptionException]) + @Test(expected = classOf[OptionException]) def testDeleteWithTopicOption() { TestUtils.createOffsetsTopic(zkClient, servers) val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group, "--topic") getConsumerGroupService(cgcArgs) - fail("Expected an error due to presence of mutually exclusive options") } @Test @@ -222,4 +222,10 @@ class DeleteConsumerGroupTest extends ConsumerGroupCommandTest { result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.COORDINATOR_NOT_AVAILABLE)) } + + @Test(expected = classOf[OptionException]) + def testDeleteWithUnrecognizedNewConsumerOption() { + val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--delete", "--group", group) + getConsumerGroupService(cgcArgs) + } } diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index 5725568e10e..a2361b706a3 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -16,6 +16,7 @@ */ package kafka.admin +import joptsimple.OptionException import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.RoundRobinAssignor import org.apache.kafka.common.TopicPartition @@ -112,12 +113,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { } } - @Test(expected = classOf[joptsimple.OptionException]) + @Test(expected = classOf[OptionException]) def testDescribeWithMultipleSubActions() { TestUtils.createOffsetsTopic(zkClient, servers) val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--members", "--state") getConsumerGroupService(cgcArgs) - fail("Expected an error due to presence of mutually exclusive options") } @Test @@ -662,6 +662,12 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { } } + @Test(expected = classOf[joptsimple.OptionException]) + def testDescribeWithUnrecognizedNewConsumerOption() { + val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--describe", "--group", group) + getConsumerGroupService(cgcArgs) + fail("Expected an error due to presence of unrecognized --new-consumer option") + } } diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala index 13dccbe113e..c83e0028797 100644 --- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala @@ -18,6 +18,7 @@ package kafka.admin import java.util.Properties +import joptsimple.OptionException import org.junit.Test import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService @@ -86,4 +87,9 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.") } + @Test(expected = classOf[OptionException]) + def testListWithUnrecognizedNewConsumerOption() { + val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--list") + getConsumerGroupService(cgcArgs) + } } diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala index 04fc428514b..116b455cb87 100644 --- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala @@ -16,6 +16,7 @@ import java.io.{BufferedWriter, File, FileWriter} import java.text.{ParseException, SimpleDateFormat} import java.util.{Calendar, Date, Properties} +import joptsimple.OptionException import kafka.admin.ConsumerGroupCommand.ConsumerGroupService import kafka.server.KafkaConfig import kafka.utils.TestUtils @@ -335,6 +336,13 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { adminZkClient.deleteTopic(topic) } + @Test(expected = classOf[OptionException]) + def testResetWithUnrecognizedNewConsumerOption() { + val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", + "--to-offset", "2", "--export") + getConsumerGroupService(cgcArgs) + } + private def produceMessages(topic: String, numMessages: Int): Unit = { val records = (0 until numMessages).map(_ => new ProducerRecord[Array[Byte], Array[Byte]](topic, new Array[Byte](100 * 1000))) diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 6f465557d7b..1a32bf42072 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -149,8 +149,7 @@ class ConsoleConsumerTest { val args: Array[String] = Array( "--bootstrap-server", "localhost:9092", "--topic", "test", - "--from-beginning", - "--new-consumer") //new + "--from-beginning") //When val config = new ConsoleConsumer.ConsumerConfig(args) @@ -169,8 +168,7 @@ class ConsoleConsumerTest { "--bootstrap-server", "localhost:9092", "--topic", "test", "--partition", "0", - "--offset", "3", - "--new-consumer") //new + "--offset", "3") //When val config = new ConsoleConsumer.ConsumerConfig(args) @@ -185,6 +183,25 @@ class ConsoleConsumerTest { } + @Test(expected = classOf[IllegalArgumentException]) + def shouldExitOnUnrecognizedNewConsumerOption(): Unit = { + Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) + + //Given + val args: Array[String] = Array( + "--new-consumer", + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--from-beginning") + + //When + try { + new ConsoleConsumer.ConsumerConfig(args) + } finally { + Exit.resetExitProcedure() + } + } + @Test def testDefaultConsumer() { //Given @@ -200,6 +217,21 @@ class ConsoleConsumerTest { assertFalse(config.useOldConsumer) } + @Test + def testNewConsumerRemovedOption() { + //Given + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--from-beginning") + + //When + val config = new ConsoleConsumer.ConsumerConfig(args) + + //Then + assertFalse(config.useOldConsumer) + } + @Test def shouldParseValidNewSimpleConsumerValidConfigWithStringOffset() { //Given @@ -208,7 +240,6 @@ class ConsoleConsumerTest { "--topic", "test", "--partition", "0", "--offset", "LatEst", - "--new-consumer", //new "--property", "print.value=false") //When @@ -366,9 +397,6 @@ class ConsoleConsumerTest { @Test(expected = classOf[IllegalArgumentException]) def shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningNewConsumer() { - - // Override exit procedure to throw an exception instead of exiting, so we can catch the exit - // properly for this test case Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) //Given @@ -384,15 +412,10 @@ class ConsoleConsumerTest { } finally { Exit.resetExitProcedure() } - - fail("Expected consumer property construction to fail due to inconsistent reset options") } @Test(expected = classOf[IllegalArgumentException]) def shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningOldConsumer() { - - // Override exit procedure to throw an exception instead of exiting, so we can catch the exit - // properly for this test case Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) //Given @@ -408,8 +431,6 @@ class ConsoleConsumerTest { } finally { Exit.resetExitProcedure() } - - fail("Expected consumer property construction to fail due to inconsistent reset options") } @Test diff --git a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala index bafe8ed136b..bc199f658ab 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala @@ -20,7 +20,8 @@ package kafka.tools import java.io.ByteArrayOutputStream import java.text.SimpleDateFormat -import org.junit.Assert.assertEquals +import joptsimple.OptionException +import org.junit.Assert.{assertEquals, assertFalse, assertTrue} import org.junit.Test class ConsumerPerformanceTest { @@ -45,6 +46,57 @@ class ConsumerPerformanceTest { s"${dateFormat.format(System.currentTimeMillis)}, 1.0, 1.0, 1, 1.0")) } + @Test + def testConfigUsingNewConsumer(): Unit = { + //Given + val args: Array[String] = Array( + "--broker-list", "localhost:9092", + "--topic", "test", + "--messages", "10" + ) + + //When + val config = new ConsumerPerformance.ConsumerPerfConfig(args) + + //Then + assertFalse(config.useOldConsumer) + assertEquals("localhost:9092", config.options.valueOf(config.bootstrapServersOpt)) + assertEquals("test", config.topic) + assertEquals(10, config.numMessages) + } + + @Test + def testConfigUsingOldConsumer() { + //Given + val args: Array[String] = Array( + "--zookeeper", "localhost:2181", + "--topic", "test", + "--messages", "10") + + //When + val config = new ConsumerPerformance.ConsumerPerfConfig(args) + + //Then + assertTrue(config.useOldConsumer) + assertEquals("localhost:2181", config.options.valueOf(config.zkConnectOpt)) + assertEquals("test", config.topic) + assertEquals(10, config.numMessages) + } + + @Test(expected = classOf[OptionException]) + def testConfigUsingNewConsumerUnrecognizedOption(): Unit = { + //Given + val args: Array[String] = Array( + "--broker-list", "localhost:9092", + "--topic", "test", + "--messages", "10", + "--new-consumer" + ) + + //When + new ConsumerPerformance.ConsumerPerfConfig(args) + } + private def testHeaderMatchContent(detailed: Boolean, useOldConsumer: Boolean, expectedOutputLineCount: Int, fun: () => Unit): Unit = { Console.withOut(outContent) { ConsumerPerformance.printHeader(detailed, useOldConsumer) diff --git a/docs/upgrade.html b/docs/upgrade.html index 056fb8366e4..cfc3c833ffe 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -97,6 +97,14 @@ <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2 will be removed in a future version.</li> <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li> <li>The tool kafka.tools.ReplayLogProducer has been removed.</li> + <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools">KIP-176</a> finally removes + the <code>--new-consumer</code> option for all consumer based tools as <code>kafka-console-consumer</code>, <code>kafka-consumer-perf-test</code> + and <code>kafka-consumer-groups</code>. + The new consumer is automatically used if the bootstrap servers list is provided on the command line + otherwise, when the zookeeper connection is provided, the old consumer is used. + The <code>--new-consumer</code> option had already been ignored as the way of selecting the consumer since Kafka 1.0.0, + this KIP just removes the option. + </li> </ul> <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5> diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index ba5abc719f6..7e919b37249 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -30,7 +30,7 @@ from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.security.minikdc import MiniKdc from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.version import DEV_BRANCH +from kafkatest.version import DEV_BRANCH, LATEST_0_10_0 Port = collections.namedtuple('Port', ['name', 'number', 'open']) @@ -584,8 +584,12 @@ def list_consumer_groups(self, node=None, new_consumer=True, command_config=None command_config = "--command-config " + command_config if new_consumer: - cmd = "%s --new-consumer --bootstrap-server %s %s --list" % \ + new_consumer_opt = "" + if node.version <= LATEST_0_10_0: + new_consumer_opt = "--new-consumer" + cmd = "%s %s --bootstrap-server %s %s --list" % \ (consumer_group_script, + new_consumer_opt, self.bootstrap_servers(self.security_protocol), command_config) else: @@ -611,8 +615,14 @@ def describe_consumer_group(self, group, node=None, new_consumer=True, command_c command_config = "--command-config " + command_config if new_consumer: - cmd = "%s --new-consumer --bootstrap-server %s %s --group %s --describe" % \ - (consumer_group_script, self.bootstrap_servers(self.security_protocol), command_config, group) + new_consumer_opt = "" + if node.version <= LATEST_0_10_0: + new_consumer_opt = "--new-consumer" + cmd = "%s %s --bootstrap-server %s %s --group %s --describe" % \ + (consumer_group_script, + new_consumer_opt, + self.bootstrap_servers(self.security_protocol), + command_config, group) else: cmd = "%s --zookeeper %s %s --group %s --describe" % \ (consumer_group_script, self.zk_connect_setting(), command_config, group) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove deprecated new-consumer option for tools > ----------------------------------------------- > > Key: KAFKA-5588 > URL: https://issues.apache.org/jira/browse/KAFKA-5588 > Project: Kafka > Issue Type: Bug > Reporter: Paolo Patierno > Assignee: Paolo Patierno > Priority: Minor > Fix For: 2.0.0 > > > Hi, > with the current version of the ConsoleConsumer, ConsumerPerformance and > ConsumerGroupCommand command line tools, it's not needed to specify the > --new-consumer option anymore in order to use the new consumer. The choice > for using the old or the new one is made just specifying the --zookeeper for > the former and --bootstrap-server for the latter. > The issues [KAFKA-5599|https://issues.apache.org/jira/browse/KAFKA-5599] and > [KAFKA-5619|https://issues.apache.org/jira/browse/KAFKA-5619] fixed and > included in the 1.0.0 release deprecated the usage of the --new-consumer flag. > More details in the related > [KIP-176|https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools]. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v7.6.3#76005)