ijuma commented on a change in pull request #9855: URL: https://github.com/apache/kafka/pull/9855#discussion_r559056476
########## File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala ########## @@ -174,16 +174,16 @@ class ReplicaFetcherThreadTest { val fetchState = fetcher.fetchState(tp).get assertEquals( - s"Partition $tp should${if (!shouldBeReadyForFetch) " NOT" else ""} be ready for fetching", - shouldBeReadyForFetch, fetchState.isReadyForFetch) + shouldBeReadyForFetch, Review comment: We can probably improve the formatting of these asserts since the description is now at the end. ########## File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithZKClientTest.scala ########## @@ -35,16 +34,14 @@ class TopicCommandWithZKClientTest extends ZooKeeperTestHarness with Logging wit private var topicService: ZookeeperTopicService = _ private var testTopicName: String = _ - private val _testName = new TestName - @Rule def testName = _testName - - @Before - def setup(): Unit = { + @BeforeEach + def setup(info: TestInfo): Unit = { topicService = ZookeeperTopicService(zkClient) - testTopicName = s"${testName.getMethodName}-${Random.alphanumeric.take(10).mkString}" + // the method name in junit 5 ends with "()" Review comment: This comment is stale, right? ########## File path: core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestNotAuthorizedTest.scala ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server + +import kafka.network.SocketServer +import kafka.server.{AlterCredentialsTest, BaseRequestTest, KafkaConfig} +import org.apache.kafka.clients.admin.ScramMechanism +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData +import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import java.util +import java.util.Properties +import scala.jdk.CollectionConverters._ + +/** + * see AlterUserScramCredentialsRequestTest + */ +class AlterUserScramCredentialsRequestNotAuthorizedTest extends BaseRequestTest { + + override def brokerPropertyOverrides(properties: Properties): Unit = { + properties.put(KafkaConfig.ControlledShutdownEnableProp, "false") + properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName) + properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilderReturningUnauthorized].getName) + } + + private val user1 = "user1" + private val user2 = "user2" + + @Test + def testAlterNothingNotAuthorized(): Unit = { + val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]) + .setUpsertions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build() + val response = sendAlterUserScramCredentialsRequest(request) + + val results = response.data.results + assertEquals(0, results.size) + } + + @Test + def testAlterSomethingNotAuthorized(): Unit = { + Review comment: Unintentional? ########## File path: core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestNotAuthorizedTest.scala ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server Review comment: There should be no `unit.` here. ########## File path: core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala ########## @@ -150,16 +150,11 @@ class LogDirFailureTest extends IntegrationTestHarness { TestUtils.causeLogDirFailure(failureType, leaderServer, partition) // send() should fail due to either KafkaStorageException or NotLeaderOrFollowerException - try { - producer.send(record).get(6000, TimeUnit.MILLISECONDS) - fail("send() should fail with either KafkaStorageException or NotLeaderOrFollowerException") - } catch { - case e: ExecutionException => - e.getCause match { - case t: KafkaStorageException => - case t: NotLeaderOrFollowerException => // This may happen if ProduceRequest version <= 3 - case t: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderOrFollowerException instead of ${t.toString}") - } + val e = assertThrows(classOf[ExecutionException], () => producer.send(record).get(6000, TimeUnit.MILLISECONDS)) + e.getCause match { + case t: KafkaStorageException => + case t: NotLeaderOrFollowerException => // This may happen if ProduceRequest version <= 3 + case t: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderOrFollowerException instead of ${t.toString}") Review comment: Maybe use `assertTrue` here? ########## File path: core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala ########## @@ -88,12 +88,12 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { (1 to 50).foreach(i => TestUtils.produceMessage(servers, tp.topic, i.toString)) // Verify that the new config is used for all segments - assertTrue("Log segment size change not applied", log.logSegments.forall(_.size > 1000)) + assertTrue(log.logSegments.forall(_.size > 1000), "Log segment size change not applied") } private def testQuotaConfigChange(user: String, clientId: String, rootEntityType: String, configEntityName: String): Unit = { - assertTrue("Should contain a ConfigHandler for " + rootEntityType , - this.servers.head.dynamicConfigHandlers.contains(rootEntityType)) + assertTrue(this.servers.head.dynamicConfigHandlers.contains(rootEntityType) , + rootEntityType + "Should contain a ConfigHandler for ") Review comment: Looks like something weird happened here. ########## File path: core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala ########## @@ -339,12 +334,9 @@ class ReassignPartitionsUnitTest { // The proposed assignment should only span the provided brokers proposed.values.foreach { - replicas => { - if (!replicas.forall(goalBrokers.contains(_))) { - Assert.fail(s"Proposed assignment ${proposed} puts replicas on brokers " + - s"other than ${goalBrokers}") - } - } + replicas => Review comment: This can be in the line above. ########## File path: core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala ########## @@ -373,7 +368,8 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(new TopicPartition(topic, partition))) preferredReplicaElection.moveLeaderToPreferredReplica() val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader)) - assertEquals("Preferred replica election failed", preferredReplica, newLeader) + assertEquals(preferredReplica, newLeader, + "Preferred replica election failed") Review comment: Can probably be all on the same line. ########## File path: core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala ########## @@ -261,14 +261,11 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @Test def testConfigChangeOnNonExistingTopic(): Unit = { val topic = TestUtils.tempTopic() - try { + assertThrows(classOf[UnknownTopicOrPartitionException], () => { val logProps = new Properties() logProps.put(FlushMessagesProp, 10000: java.lang.Integer) adminZkClient.changeTopicConfig(topic, logProps) Review comment: Isn't it better to only have the exception throwing code inside the `assertThrows`? ########## File path: core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala ########## @@ -86,13 +86,8 @@ class TransactionMarkerRequestCompletionHandlerTest { val response = new WriteTxnMarkersResponse(new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()) - try { - handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1), - null, null, 0, 0, false, null, null, response)) - fail("should have thrown illegal argument exception") - } catch { - case _: IllegalStateException => // ok - } + assertThrows(classOf[IllegalStateException], () => handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1), + null, null, 0, 0, false, null, null, response))) Review comment: Indent? ########## File path: core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala ########## @@ -70,14 +70,11 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { Configuration.setConfiguration(null) System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) assertFalse(JaasUtils.isZkSaslEnabled()) - try { + assertThrows(classOf[KafkaException], () => { Review comment: Should `assertThrows` only enclose the last expression? ########## File path: core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala ########## @@ -337,8 +337,8 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { time.sleep(1000) } - assertEquals("Should be unthrottled since bursty sample has rolled over", - 0, maybeRecord(clientQuotaManager, "ANONYMOUS", "unknown", 0)) + assertEquals(0, maybeRecord(clientQuotaManager, "ANONYMOUS", "unknown", 0), + "Should be unthrottled since bursty sample has rolled over") Review comment: Indent. ########## File path: core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ########## @@ -263,17 +254,11 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { // shut down one broker servers.head.shutdown() servers.head.awaitShutdown() - try { - producer3.send(record).get - fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas") - } catch { - case e: ExecutionException => - if (!e.getCause.isInstanceOf[NotEnoughReplicasException] && - !e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException] && - !e.getCause.isInstanceOf[TimeoutException]) { - fail("Expected NotEnoughReplicasException or NotEnoughReplicasAfterAppendException when producing to topic " + - "with fewer brokers than min.insync.replicas, but saw " + e.getCause) - } + assertThrows(classOf[ExecutionException], () => producer3.send(record).get).getCause match { + case _ @ (_: NotEnoughReplicasException | _: NotEnoughReplicasAfterAppendException | _: TimeoutException) => // pass + case e: Throwable => + fail("Expected NotEnoughReplicasException or NotEnoughReplicasAfterAppendException when producing to topic " + Review comment: This message is out of date, I don't think it's worth enumerating the exceptions in the message since people tend to forget to update it. I'd probably just do `assertTrue`. ########## File path: core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala ########## @@ -138,37 +138,34 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { // send a normal record val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)) - assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset) + assertEquals(0L, producer.send(record0, callback).get.offset, "Should have offset 0") // send a record with null value should be ok val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8), null) - assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset) + assertEquals(1L, producer.send(record1, callback).get.offset, "Should have offset 1") // send a record with null key should be ok val record2 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes(StandardCharsets.UTF_8)) - assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset) + assertEquals(2L, producer.send(record2, callback).get.offset, "Should have offset 2") // send a record with null part id should be ok val record3 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)) - assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset) + assertEquals(3L, producer.send(record3, callback).get.offset, "Should have offset 3") // send a record with null topic should fail - try { + assertThrows(classOf[IllegalArgumentException], () => { val record4 = new ProducerRecord[Array[Byte], Array[Byte]](null, partition, "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)) Review comment: Move outside `assertThrows`? ########## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ########## @@ -608,29 +557,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // assert that the topic1 now has 4 partitions altered = alterResult.values.get(topic1).get TestUtils.waitUntilTrue(() => numPartitions(topic1) == 4, "Timed out waiting for new partitions to appear") - try { - altered = alterResult.values.get(topic2).get - } catch { - case e: ExecutionException => - assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException]) - assertEquals("Topic currently has 3 partitions, which is higher than the requested 2.", e.getCause.getMessage) - // assert that the topic2 still has 3 partitions - assertEquals(3, numPartitions(topic2)) - } + var e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get) + assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException]) + assertEquals("Topic currently has 3 partitions, which is higher than the requested 2.", e.getCause.getMessage) + // assert that the topic2 still has 3 partitions Review comment: Looks like a redundant comment. ########## File path: core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala ########## @@ -375,16 +364,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness { val topic = topicPartition.topic servers = createTestTopicAndCluster(topic) - try { + assertThrows(classOf[TopicAlreadyMarkedForDeletionException], () => { // start topic deletion adminZkClient.deleteTopic(topic) Review comment: Should this be outside of the assertThrows? ########## File path: core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala ########## @@ -353,11 +353,11 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { metrics.removeSensor("ProduceThrottleTime-:client1") // should not throw an exception even if the throttle time sensor does not exist. val throttleTime = maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 10000) - assertTrue("Should be throttled", throttleTime > 0) + assertTrue(throttleTime > 0, "Should be throttled") // the sensor should get recreated val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:client1") - assertTrue("Throttle time sensor should exist", throttleTimeSensor != null) - assertTrue("Throttle time sensor should exist", throttleTimeSensor != null) + assertTrue(throttleTimeSensor != null, "Throttle time sensor should exist") + assertTrue(throttleTimeSensor != null, "Throttle time sensor should exist") Review comment: Use `assertNotNull`? There are a few other cases like this in the file. ########## File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ########## @@ -1417,13 +1417,10 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { def checkInvalidEntity(entityType: String, entityName: Option[String], otherArgs: Array[String]): Unit = { val opts = createOpts(entityType, entityName, otherArgs) - try { + assertThrows(classOf[IllegalArgumentException], () => { opts.checkArgs() Review comment: Move this out of the assertThrows block? ########## File path: core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala ########## @@ -356,17 +320,17 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest { } private def checkAllErrorsAlteringCredentials(resultsToCheck: util.List[AlterUserScramCredentialsResult], expectedError: Errors, contextMsg: String) = { Review comment: Is this still used in this file? ########## File path: core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestNotAuthorizedTest.scala ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server + +import kafka.network.SocketServer +import kafka.server.{AlterCredentialsTest, BaseRequestTest, KafkaConfig} +import org.apache.kafka.clients.admin.ScramMechanism +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData +import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import java.util +import java.util.Properties +import scala.jdk.CollectionConverters._ + +/** + * see AlterUserScramCredentialsRequestTest + */ +class AlterUserScramCredentialsRequestNotAuthorizedTest extends BaseRequestTest { + + override def brokerPropertyOverrides(properties: Properties): Unit = { + properties.put(KafkaConfig.ControlledShutdownEnableProp, "false") + properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName) + properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilderReturningUnauthorized].getName) + } + + private val user1 = "user1" + private val user2 = "user2" + + @Test + def testAlterNothingNotAuthorized(): Unit = { + val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]) + .setUpsertions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build() + val response = sendAlterUserScramCredentialsRequest(request) + + val results = response.data.results + assertEquals(0, results.size) + } + + @Test + def testAlterSomethingNotAuthorized(): Unit = { + + val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`))) + .setUpsertions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_512.`type`)))).build() + val response = sendAlterUserScramCredentialsRequest(request) + + val results = response.data.results + assertEquals(2, results.size) + checkAllErrorsAlteringCredentials(results, Errors.CLUSTER_AUTHORIZATION_FAILED, "when not authorized") + } + + private def sendAlterUserScramCredentialsRequest(request: AlterUserScramCredentialsRequest, socketServer: SocketServer = controllerSocketServer): AlterUserScramCredentialsResponse = { + connectAndReceive[AlterUserScramCredentialsResponse](request, destination = socketServer) + } + + Review comment: Unintentional empty line? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org