ijuma commented on a change in pull request #9855:
URL: https://github.com/apache/kafka/pull/9855#discussion_r559056476



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
##########
@@ -174,16 +174,16 @@ class ReplicaFetcherThreadTest {
       val fetchState = fetcher.fetchState(tp).get
 
       assertEquals(
-        s"Partition $tp should${if (!shouldBeReadyForFetch) " NOT" else ""} be 
ready for fetching",
-        shouldBeReadyForFetch, fetchState.isReadyForFetch)
+        shouldBeReadyForFetch,

Review comment:
       We can probably improve the formatting of these asserts since the 
description is now at the end.

##########
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithZKClientTest.scala
##########
@@ -35,16 +34,14 @@ class TopicCommandWithZKClientTest extends 
ZooKeeperTestHarness with Logging wit
   private var topicService: ZookeeperTopicService = _
   private var testTopicName: String = _
 
-  private val _testName = new TestName
-  @Rule def testName = _testName
-
-  @Before
-  def setup(): Unit = {
+  @BeforeEach
+  def setup(info: TestInfo): Unit = {
     topicService = ZookeeperTopicService(zkClient)
-    testTopicName = 
s"${testName.getMethodName}-${Random.alphanumeric.take(10).mkString}"
+    // the method name in junit 5 ends with "()"

Review comment:
       This comment is stale, right?

##########
File path: 
core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestNotAuthorizedTest.scala
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import kafka.network.SocketServer
+import kafka.server.{AlterCredentialsTest, BaseRequestTest, KafkaConfig}
+import org.apache.kafka.clients.admin.ScramMechanism
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, 
AlterUserScramCredentialsResponse}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import java.util
+import java.util.Properties
+import scala.jdk.CollectionConverters._
+
+/**
+ * see AlterUserScramCredentialsRequestTest
+ */
+class AlterUserScramCredentialsRequestNotAuthorizedTest extends 
BaseRequestTest {
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+    properties.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[AlterCredentialsTest.TestAuthorizer].getName)
+    properties.put(KafkaConfig.PrincipalBuilderClassProp, 
classOf[AlterCredentialsTest.TestPrincipalBuilderReturningUnauthorized].getName)
+  }
+
+  private val user1 = "user1"
+  private val user2 = "user2"
+
+  @Test
+  def testAlterNothingNotAuthorized(): Unit = {
+    val request = new AlterUserScramCredentialsRequest.Builder(
+      new AlterUserScramCredentialsRequestData()
+        .setDeletions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
+        .setUpsertions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build()
+    val response = sendAlterUserScramCredentialsRequest(request)
+
+    val results = response.data.results
+    assertEquals(0, results.size)
+  }
+
+  @Test
+  def testAlterSomethingNotAuthorized(): Unit = {
+

Review comment:
       Unintentional?

##########
File path: 
core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestNotAuthorizedTest.scala
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server

Review comment:
       There should be no `unit.` here.

##########
File path: core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
##########
@@ -150,16 +150,11 @@ class LogDirFailureTest extends IntegrationTestHarness {
     TestUtils.causeLogDirFailure(failureType, leaderServer, partition)
 
     // send() should fail due to either KafkaStorageException or 
NotLeaderOrFollowerException
-    try {
-      producer.send(record).get(6000, TimeUnit.MILLISECONDS)
-      fail("send() should fail with either KafkaStorageException or 
NotLeaderOrFollowerException")
-    } catch {
-      case e: ExecutionException =>
-        e.getCause match {
-          case t: KafkaStorageException =>
-          case t: NotLeaderOrFollowerException => // This may happen if 
ProduceRequest version <= 3
-          case t: Throwable => fail(s"send() should fail with either 
KafkaStorageException or NotLeaderOrFollowerException instead of ${t.toString}")
-        }
+    val e = assertThrows(classOf[ExecutionException], () => 
producer.send(record).get(6000, TimeUnit.MILLISECONDS))
+    e.getCause match {
+      case t: KafkaStorageException =>
+      case t: NotLeaderOrFollowerException => // This may happen if 
ProduceRequest version <= 3
+      case t: Throwable => fail(s"send() should fail with either 
KafkaStorageException or NotLeaderOrFollowerException instead of ${t.toString}")

Review comment:
       Maybe use `assertTrue` here?

##########
File path: core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
##########
@@ -88,12 +88,12 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
 
     (1 to 50).foreach(i => TestUtils.produceMessage(servers, tp.topic, 
i.toString))
     // Verify that the new config is used for all segments
-    assertTrue("Log segment size change not applied", 
log.logSegments.forall(_.size > 1000))
+    assertTrue(log.logSegments.forall(_.size > 1000), "Log segment size change 
not applied")
   }
 
   private def testQuotaConfigChange(user: String, clientId: String, 
rootEntityType: String, configEntityName: String): Unit = {
-    assertTrue("Should contain a ConfigHandler for " + rootEntityType ,
-               
this.servers.head.dynamicConfigHandlers.contains(rootEntityType))
+    
assertTrue(this.servers.head.dynamicConfigHandlers.contains(rootEntityType) ,
+               rootEntityType + "Should contain a ConfigHandler for ")

Review comment:
       Looks like something weird happened here.

##########
File path: core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
##########
@@ -339,12 +334,9 @@ class ReassignPartitionsUnitTest {
 
       // The proposed assignment should only span the provided brokers
       proposed.values.foreach {
-        replicas => {
-          if (!replicas.forall(goalBrokers.contains(_))) {
-            Assert.fail(s"Proposed assignment ${proposed} puts replicas on 
brokers " +
-              s"other than ${goalBrokers}")
-          }
-        }
+        replicas =>

Review comment:
       This can be in the line above.

##########
File path: 
core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
##########
@@ -373,7 +368,8 @@ class PreferredReplicaLeaderElectionCommandTest extends 
ZooKeeperTestHarness wit
     val preferredReplicaElection = new 
PreferredReplicaLeaderElectionCommand(zkClient, Set(new TopicPartition(topic, 
partition)))
     preferredReplicaElection.moveLeaderToPreferredReplica()
     val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, 
topic, partition, oldLeaderOpt = Some(currentLeader))
-    assertEquals("Preferred replica election failed", preferredReplica, 
newLeader)
+    assertEquals(preferredReplica, newLeader,
+      "Preferred replica election failed")

Review comment:
       Can probably be all on the same line.

##########
File path: core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
##########
@@ -261,14 +261,11 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
   @Test
   def testConfigChangeOnNonExistingTopic(): Unit = {
     val topic = TestUtils.tempTopic()
-    try {
+    assertThrows(classOf[UnknownTopicOrPartitionException], () => {
       val logProps = new Properties()
       logProps.put(FlushMessagesProp, 10000: java.lang.Integer)
       adminZkClient.changeTopicConfig(topic, logProps)

Review comment:
       Isn't it better to only have the exception throwing code inside the 
`assertThrows`?

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
##########
@@ -86,13 +86,8 @@ class TransactionMarkerRequestCompletionHandlerTest {
 
     val response = new WriteTxnMarkersResponse(new 
java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]())
 
-    try {
-      handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 
0, "client", 1),
-        null, null, 0, 0, false, null, null, response))
-      fail("should have thrown illegal argument exception")
-    } catch {
-      case _: IllegalStateException => // ok
-    }
+    assertThrows(classOf[IllegalStateException], () => handler.onComplete(new 
ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
+    null, null, 0, 0, false, null, null, response)))

Review comment:
       Indent?

##########
File path: 
core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
##########
@@ -70,14 +70,11 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with 
Logging {
     Configuration.setConfiguration(null)
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     assertFalse(JaasUtils.isZkSaslEnabled())
-    try {
+    assertThrows(classOf[KafkaException], () => {

Review comment:
       Should `assertThrows` only enclose the last expression?

##########
File path: core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
##########
@@ -337,8 +337,8 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
         time.sleep(1000)
       }
 
-      assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, maybeRecord(clientQuotaManager, "ANONYMOUS", "unknown", 
0))
+      assertEquals(0, maybeRecord(clientQuotaManager, "ANONYMOUS", "unknown", 
0),
+      "Should be unthrottled since bursty sample has rolled over")

Review comment:
       Indent.

##########
File path: 
core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
##########
@@ -263,17 +254,11 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
     // shut down one broker
     servers.head.shutdown()
     servers.head.awaitShutdown()
-    try {
-      producer3.send(record).get
-      fail("Expected exception when producing to topic with fewer brokers than 
min.insync.replicas")
-    } catch {
-      case e: ExecutionException =>
-        if (!e.getCause.isInstanceOf[NotEnoughReplicasException]  &&
-            !e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException] &&
-            !e.getCause.isInstanceOf[TimeoutException]) {
-          fail("Expected NotEnoughReplicasException or 
NotEnoughReplicasAfterAppendException when producing to topic " +
-            "with fewer brokers than min.insync.replicas, but saw " + 
e.getCause)
-        }
+    assertThrows(classOf[ExecutionException], () => 
producer3.send(record).get).getCause match {
+      case _ @ (_: NotEnoughReplicasException | _: 
NotEnoughReplicasAfterAppendException | _: TimeoutException) => // pass
+      case e: Throwable =>
+        fail("Expected NotEnoughReplicasException or 
NotEnoughReplicasAfterAppendException when producing to topic " +

Review comment:
       This message is out of date, I don't think it's worth enumerating the 
exceptions in the message since people tend to forget to update it. I'd 
probably just do `assertTrue`.

##########
File path: core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
##########
@@ -138,37 +138,34 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
       // send a normal record
       val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 
partition, "key".getBytes(StandardCharsets.UTF_8),
         "value".getBytes(StandardCharsets.UTF_8))
-      assertEquals("Should have offset 0", 0L, producer.send(record0, 
callback).get.offset)
+      assertEquals(0L, producer.send(record0, callback).get.offset, "Should 
have offset 0")
 
       // send a record with null value should be ok
       val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 
partition, "key".getBytes(StandardCharsets.UTF_8), null)
-      assertEquals("Should have offset 1", 1L, producer.send(record1, 
callback).get.offset)
+      assertEquals(1L, producer.send(record1, callback).get.offset, "Should 
have offset 1")
 
       // send a record with null key should be ok
       val record2 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 
partition, null, "value".getBytes(StandardCharsets.UTF_8))
-      assertEquals("Should have offset 2", 2L, producer.send(record2, 
callback).get.offset)
+      assertEquals(2L, producer.send(record2, callback).get.offset, "Should 
have offset 2")
 
       // send a record with null part id should be ok
       val record3 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, 
"key".getBytes(StandardCharsets.UTF_8),
         "value".getBytes(StandardCharsets.UTF_8))
-      assertEquals("Should have offset 3", 3L, producer.send(record3, 
callback).get.offset)
+      assertEquals(3L, producer.send(record3, callback).get.offset, "Should 
have offset 3")
 
       // send a record with null topic should fail
-      try {
+      assertThrows(classOf[IllegalArgumentException], () => {
         val record4 = new ProducerRecord[Array[Byte], Array[Byte]](null, 
partition, "key".getBytes(StandardCharsets.UTF_8),
           "value".getBytes(StandardCharsets.UTF_8))

Review comment:
       Move outside `assertThrows`?

##########
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##########
@@ -608,29 +557,21 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     // assert that the topic1 now has 4 partitions
     altered = alterResult.values.get(topic1).get
     TestUtils.waitUntilTrue(() => numPartitions(topic1) == 4, "Timed out 
waiting for new partitions to appear")
-    try {
-      altered = alterResult.values.get(topic2).get
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
-        assertEquals("Topic currently has 3 partitions, which is higher than 
the requested 2.", e.getCause.getMessage)
-        // assert that the topic2 still has 3 partitions
-        assertEquals(3, numPartitions(topic2))
-    }
+    var e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic2).get)
+    assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
+    assertEquals("Topic currently has 3 partitions, which is higher than the 
requested 2.", e.getCause.getMessage)
+    // assert that the topic2 still has 3 partitions

Review comment:
       Looks like a redundant comment.

##########
File path: core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
##########
@@ -375,16 +364,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topic = topicPartition.topic
     servers = createTestTopicAndCluster(topic)
 
-    try {
+    assertThrows(classOf[TopicAlreadyMarkedForDeletionException], () => {
       // start topic deletion
       adminZkClient.deleteTopic(topic)

Review comment:
       Should this be outside of the assertThrows?

##########
File path: core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
##########
@@ -353,11 +353,11 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
       metrics.removeSensor("ProduceThrottleTime-:client1")
       // should not throw an exception even if the throttle time sensor does 
not exist.
       val throttleTime = maybeRecord(clientQuotaManager, "ANONYMOUS", 
"client1", 10000)
-      assertTrue("Should be throttled", throttleTime > 0)
+      assertTrue(throttleTime > 0, "Should be throttled")
       // the sensor should get recreated
       val throttleTimeSensor = 
metrics.getSensor("ProduceThrottleTime-:client1")
-      assertTrue("Throttle time sensor should exist", throttleTimeSensor != 
null)
-      assertTrue("Throttle time sensor should exist", throttleTimeSensor != 
null)
+      assertTrue(throttleTimeSensor != null, "Throttle time sensor should 
exist")
+      assertTrue(throttleTimeSensor != null, "Throttle time sensor should 
exist")

Review comment:
       Use `assertNotNull`? There are a few other cases like this in the file.

##########
File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
##########
@@ -1417,13 +1417,10 @@ class ConfigCommandTest extends ZooKeeperTestHarness 
with Logging {
 
     def checkInvalidEntity(entityType: String, entityName: Option[String], 
otherArgs: Array[String]): Unit = {
       val opts = createOpts(entityType, entityName, otherArgs)
-      try {
+      assertThrows(classOf[IllegalArgumentException], () => {
         opts.checkArgs()

Review comment:
       Move this out of the assertThrows block?

##########
File path: 
core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
##########
@@ -356,17 +320,17 @@ class AlterUserScramCredentialsRequestTest extends 
BaseRequestTest {
   }
 
   private def checkAllErrorsAlteringCredentials(resultsToCheck: 
util.List[AlterUserScramCredentialsResult], expectedError: Errors, contextMsg: 
String) = {

Review comment:
       Is this still used in this file?

##########
File path: 
core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestNotAuthorizedTest.scala
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import kafka.network.SocketServer
+import kafka.server.{AlterCredentialsTest, BaseRequestTest, KafkaConfig}
+import org.apache.kafka.clients.admin.ScramMechanism
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, 
AlterUserScramCredentialsResponse}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import java.util
+import java.util.Properties
+import scala.jdk.CollectionConverters._
+
+/**
+ * see AlterUserScramCredentialsRequestTest
+ */
+class AlterUserScramCredentialsRequestNotAuthorizedTest extends 
BaseRequestTest {
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+    properties.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[AlterCredentialsTest.TestAuthorizer].getName)
+    properties.put(KafkaConfig.PrincipalBuilderClassProp, 
classOf[AlterCredentialsTest.TestPrincipalBuilderReturningUnauthorized].getName)
+  }
+
+  private val user1 = "user1"
+  private val user2 = "user2"
+
+  @Test
+  def testAlterNothingNotAuthorized(): Unit = {
+    val request = new AlterUserScramCredentialsRequest.Builder(
+      new AlterUserScramCredentialsRequestData()
+        .setDeletions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
+        .setUpsertions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build()
+    val response = sendAlterUserScramCredentialsRequest(request)
+
+    val results = response.data.results
+    assertEquals(0, results.size)
+  }
+
+  @Test
+  def testAlterSomethingNotAuthorized(): Unit = {
+
+    val request = new AlterUserScramCredentialsRequest.Builder(
+      new AlterUserScramCredentialsRequestData()
+        .setDeletions(util.Arrays.asList(new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)))
+        .setUpsertions(util.Arrays.asList(new 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_512.`type`)))).build()
+    val response = sendAlterUserScramCredentialsRequest(request)
+
+    val results = response.data.results
+    assertEquals(2, results.size)
+    checkAllErrorsAlteringCredentials(results, 
Errors.CLUSTER_AUTHORIZATION_FAILED, "when not authorized")
+  }
+
+  private def sendAlterUserScramCredentialsRequest(request: 
AlterUserScramCredentialsRequest, socketServer: SocketServer = 
controllerSocketServer): AlterUserScramCredentialsResponse = {
+    connectAndReceive[AlterUserScramCredentialsResponse](request, destination 
= socketServer)
+  }
+
+

Review comment:
       Unintentional empty line?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to