Jiabao-Sun commented on code in PR #23211:
URL: https://github.com/apache/flink/pull/23211#discussion_r1298315587


##########
flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java:
##########
@@ -71,16 +70,16 @@ public void 
stringifyingResultsShouldIncorporateAccumulatorLocalValueDirectly()
         final StringifiedAccumulatorResult[] results =
                 
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
 
-        assertEquals(1, results.length);
+        assertThat(results).isNotEmpty();

Review Comment:
   ```suggestion
           assertThat(results).hasSize(1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java:
##########
@@ -89,52 +88,50 @@ public void 
stringifyingResultsShouldReportNullLocalValueAsNonnullValueString()
         final StringifiedAccumulatorResult[] results =
                 
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
 
-        assertEquals(1, results.length);
+        assertThat(results).isNotEmpty();

Review Comment:
   ```suggestion
           assertThat(results).hasSize(1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java:
##########
@@ -89,52 +88,50 @@ public void 
stringifyingResultsShouldReportNullLocalValueAsNonnullValueString()
         final StringifiedAccumulatorResult[] results =
                 
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
 
-        assertEquals(1, results.length);
+        assertThat(results).isNotEmpty();
 
         // Note the use of a String with a content of "null" rather than a 
null value
         final StringifiedAccumulatorResult firstResult = results[0];
-        assertEquals(name, firstResult.getName());
-        assertEquals("NullBearingAccumulator", firstResult.getType());
-        assertEquals("null", firstResult.getValue());
+        assertThat(firstResult.getName()).isEqualTo(name);
+        assertThat(firstResult.getType()).isEqualTo("NullBearingAccumulator");
+        assertThat(firstResult.getValue()).isEqualTo("null");
     }
 
     @Test
-    public void 
stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
+    void 
stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
         final String name = "a";
         final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = 
new HashMap<>();
         accumulatorMap.put(name, null);
 
         final StringifiedAccumulatorResult[] results =
                 
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
 
-        assertEquals(1, results.length);
+        assertThat(results).isNotEmpty();
 
         // Note the use of String values with content of "null" rather than 
null values
         final StringifiedAccumulatorResult firstResult = results[0];
-        assertEquals(name, firstResult.getName());
-        assertEquals("null", firstResult.getType());
-        assertEquals("null", firstResult.getValue());
+        assertThat(firstResult.getName()).isEqualTo(name);
+        assertThat(firstResult.getType()).isEqualTo("null");
+        assertThat(firstResult.getValue()).isEqualTo("null");
     }
 
     @Test
-    public void stringifyingFailureResults() {
+    void stringifyingFailureResults() {
         final String name = "a";
         final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = 
new HashMap<>();
         accumulatorMap.put(name, OptionalFailure.ofFailure(new 
FlinkRuntimeException("Test")));
 
         final StringifiedAccumulatorResult[] results =
                 
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
 
-        assertEquals(1, results.length);
+        assertThat(results).isNotEmpty();

Review Comment:
   ```suggestion
           assertThat(results).hasSize(1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java:
##########
@@ -117,57 +109,57 @@ private void testDelete(@Nullable JobID jobId1, @Nullable 
JobID jobId2) throws I
 
             // put first BLOB
             TransientBlobKey key1 = (TransientBlobKey) put(server, jobId1, 
data, TRANSIENT_BLOB);
-            assertNotNull(key1);
+            assertThat(key1).isNotNull();
 
             // put two more BLOBs (same key, other key) for another job ID
             TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, 
data, TRANSIENT_BLOB);
-            assertNotNull(key2a);
+            assertThat(key2a).isNotNull();
             BlobKeyTest.verifyKeyDifferentHashEquals(key1, key2a);
             TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, 
data2, TRANSIENT_BLOB);
-            assertNotNull(key2b);
+            assertThat(key2b).isNotNull();
             BlobKeyTest.verifyKeyDifferentHashDifferent(key1, key2b);
 
             // issue a DELETE request
-            assertTrue(delete(cache, jobId1, key1));
+            assertThat(delete(cache, jobId1, key1)).isTrue();
 
             // delete only works on local cache!
-            assertTrue(server.getStorageLocation(jobId1, key1).exists());
+            assertThat(server.getStorageLocation(jobId1, 
key1).exists()).isTrue();

Review Comment:
   ```suggestion
               assertThat(server.getStorageLocation(jobId1, key1)).exists();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java:
##########
@@ -83,31 +77,30 @@
  * <p>Most successful GET requests are tested in conjunction wit the PUT 
requests by {@link
  * BlobCachePutTest}.
  */
-public class BlobCacheGetTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java:
##########
@@ -315,35 +298,33 @@ private void testConcurrentDeleteOperations(@Nullable 
final JobID jobId)
 
         final byte[] data = {1, 2, 3};
 
-        try (BlobServer server =
-                        new BlobServer(config, temporaryFolder.newFolder(), 
new VoidBlobStore());
-                BlobCacheService cache =
-                        new BlobCacheService(
-                                config,
-                                temporaryFolder.newFolder(),
-                                new VoidBlobStore(),
-                                new InetSocketAddress("localhost", 
server.getPort()))) {
-
+        Tuple2<BlobServer, BlobCacheService> serverAndCache =
+                TestingBlobUtils.createServerAndCache(tempDir);
+        try (BlobServer server = serverAndCache.f0;
+                BlobCacheService cache = serverAndCache.f1) {
             server.start();
 
             final TransientBlobKey blobKey =
                     (TransientBlobKey) put(server, jobId, data, 
TRANSIENT_BLOB);
 
-            assertTrue(server.getStorageLocation(jobId, blobKey).exists());
+            assertThat(server.getStorageLocation(jobId, 
blobKey).exists()).isTrue();
 
             for (int i = 0; i < concurrentDeleteOperations; i++) {
                 CompletableFuture<Void> deleteFuture =
                         CompletableFuture.supplyAsync(
                                 () -> {
                                     try {
-                                        assertTrue(delete(cache, jobId, 
blobKey));
-                                        assertFalse(
-                                                cache.getTransientBlobService()
-                                                        
.getStorageLocation(jobId, blobKey)
-                                                        .exists());
+                                        assertThat(delete(cache, jobId, 
blobKey)).isTrue();
+                                        assertThat(
+                                                        
cache.getTransientBlobService()
+                                                                
.getStorageLocation(jobId, blobKey)
+                                                                .exists())
+                                                .isFalse();
                                         // delete only works on local cache!
-                                        assertTrue(
-                                                
server.getStorageLocation(jobId, blobKey).exists());
+                                        assertThat(
+                                                        
server.getStorageLocation(jobId, blobKey)
+                                                                .exists())
+                                                .isTrue();

Review Comment:
   ```suggestion
                                           
assertThat(server.getStorageLocation(jobId, blobKey))
                                                   .exists();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java:
##########
@@ -49,17 +48,13 @@
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeFalse;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** This class contains unit tests for the {@link BlobClient}. */
-public class BlobClientTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java:
##########
@@ -18,135 +18,43 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Random;
-
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
-import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashDifferent;
-import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
-import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
-import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
-import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertTrue;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 /** Tests for the recovery of files of a {@link BlobCacheService} from a HA 
store. */
-public class BlobCacheRecoveryTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java:
##########
@@ -58,12 +59,15 @@
 import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
+import static 
org.apache.flink.runtime.blob.TestingBlobHelpers.checkFileCountForJob;
+import static org.apache.flink.runtime.blob.TestingBlobHelpers.checkFilesExist;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.fail;
 
 /** A few tests for the cleanup of transient BLOBs at the {@link BlobServer}. 
*/
-public class BlobServerCleanupTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java:
##########
@@ -21,29 +21,28 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.blob.BlobKey.BlobType;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link BlobCacheSizeTracker}. */
-public class BlobCacheSizeTrackerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java:
##########
@@ -21,57 +21,45 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.core.testutils.FlinkAssertions;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.apache.commons.io.FileUtils;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Random;
+import java.nio.file.Path;
 
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
-import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
-import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Tests how GET requests react to corrupt files when downloaded via a {@link 
BlobCacheService}.
  *
  * <p>Successful GET requests are tested in conjunction wit the PUT requests.
  */
-public class BlobCacheCorruptionTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java:
##########
@@ -117,57 +109,57 @@ private void testDelete(@Nullable JobID jobId1, @Nullable 
JobID jobId2) throws I
 
             // put first BLOB
             TransientBlobKey key1 = (TransientBlobKey) put(server, jobId1, 
data, TRANSIENT_BLOB);
-            assertNotNull(key1);
+            assertThat(key1).isNotNull();
 
             // put two more BLOBs (same key, other key) for another job ID
             TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, 
data, TRANSIENT_BLOB);
-            assertNotNull(key2a);
+            assertThat(key2a).isNotNull();
             BlobKeyTest.verifyKeyDifferentHashEquals(key1, key2a);
             TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, 
data2, TRANSIENT_BLOB);
-            assertNotNull(key2b);
+            assertThat(key2b).isNotNull();
             BlobKeyTest.verifyKeyDifferentHashDifferent(key1, key2b);
 
             // issue a DELETE request
-            assertTrue(delete(cache, jobId1, key1));
+            assertThat(delete(cache, jobId1, key1)).isTrue();
 
             // delete only works on local cache!
-            assertTrue(server.getStorageLocation(jobId1, key1).exists());
+            assertThat(server.getStorageLocation(jobId1, 
key1).exists()).isTrue();
             // delete on server so that the cache cannot re-download
-            assertTrue(server.deleteInternal(jobId1, key1));
-            assertFalse(server.getStorageLocation(jobId1, key1).exists());
+            assertThat(server.deleteInternal(jobId1, key1)).isTrue();
+            assertThat(server.getStorageLocation(jobId1, 
key1).exists()).isFalse();
             verifyDeleted(cache, jobId1, key1);
             // deleting one BLOB should not affect another BLOB with a 
different key
             // (and keys are always different now)
             verifyContents(server, jobId2, key2a, data);
             verifyContents(server, jobId2, key2b, data2);
 
             // delete first file of second job
-            assertTrue(delete(cache, jobId2, key2a));
+            assertThat(delete(cache, jobId2, key2a)).isTrue();
             // delete only works on local cache
-            assertTrue(server.getStorageLocation(jobId2, key2a).exists());
+            assertThat(server.getStorageLocation(jobId2, 
key2a).exists()).isTrue();

Review Comment:
   Same as above.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java:
##########
@@ -89,52 +88,50 @@ public void 
stringifyingResultsShouldReportNullLocalValueAsNonnullValueString()
         final StringifiedAccumulatorResult[] results =
                 
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
 
-        assertEquals(1, results.length);
+        assertThat(results).isNotEmpty();
 
         // Note the use of a String with a content of "null" rather than a 
null value
         final StringifiedAccumulatorResult firstResult = results[0];
-        assertEquals(name, firstResult.getName());
-        assertEquals("NullBearingAccumulator", firstResult.getType());
-        assertEquals("null", firstResult.getValue());
+        assertThat(firstResult.getName()).isEqualTo(name);
+        assertThat(firstResult.getType()).isEqualTo("NullBearingAccumulator");
+        assertThat(firstResult.getValue()).isEqualTo("null");
     }
 
     @Test
-    public void 
stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
+    void 
stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() {
         final String name = "a";
         final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = 
new HashMap<>();
         accumulatorMap.put(name, null);
 
         final StringifiedAccumulatorResult[] results =
                 
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
 
-        assertEquals(1, results.length);
+        assertThat(results).isNotEmpty();

Review Comment:
   ```suggestion
           assertThat(results).hasSize(1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java:
##########
@@ -176,47 +168,42 @@ public void testDeleteTransientAlreadyDeletedForJob() 
throws IOException {
      *
      * @param jobId job id
      */
-    private void testDeleteTransientAlreadyDeleted(@Nullable final JobID 
jobId) throws IOException {
-
-        final Configuration config = new Configuration();
-        try (BlobServer server =
-                        new BlobServer(config, temporaryFolder.newFolder(), 
new VoidBlobStore());
-                BlobCacheService cache =
-                        new BlobCacheService(
-                                config,
-                                temporaryFolder.newFolder(),
-                                new VoidBlobStore(),
-                                new InetSocketAddress("localhost", 
server.getPort()))) {
+    private void testDeleteTransientAlreadyDeleted(Path tempDir, @Nullable 
final JobID jobId)

Review Comment:
   Why we need `Path tempDir` here?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java:
##########
@@ -75,29 +71,30 @@
  * <p>Successful GET requests are tested in conjunction wit the PUT requests 
by {@link
  * BlobServerPutTest}.
  */
-public class BlobServerGetTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java:
##########
@@ -317,20 +303,21 @@ public void testPermanentBlobDeferredCleanup() throws 
IOException, InterruptedEx
     }
 
     @Test
-    public void testTransientBlobNoJobCleanup() throws Exception {
-        testTransientBlobCleanup(null);
+    void testTransientBlobNoJobCleanup() throws Exception {
+        testTransientBlobCleanup(tempDir, null);
     }
 
     @Test
-    public void testTransientBlobForJobCleanup() throws Exception {
-        testTransientBlobCleanup(new JobID());
+    void testTransientBlobForJobCleanup() throws Exception {
+        testTransientBlobCleanup(tempDir, new JobID());
     }
 
     /**
      * Tests that {@link TransientBlobCache} cleans up after a default TTL and 
keeps files which are
      * constantly accessed.
      */
-    private void testTransientBlobCleanup(@Nullable final JobID jobId) throws 
Exception {
+    private void testTransientBlobCleanup(final Path tempDir, @Nullable final 
JobID jobId)

Review Comment:
   Why do we need to add `Path tempDir` parameter?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java:
##########
@@ -315,35 +298,33 @@ private void testConcurrentDeleteOperations(@Nullable 
final JobID jobId)
 
         final byte[] data = {1, 2, 3};
 
-        try (BlobServer server =
-                        new BlobServer(config, temporaryFolder.newFolder(), 
new VoidBlobStore());
-                BlobCacheService cache =
-                        new BlobCacheService(
-                                config,
-                                temporaryFolder.newFolder(),
-                                new VoidBlobStore(),
-                                new InetSocketAddress("localhost", 
server.getPort()))) {
-
+        Tuple2<BlobServer, BlobCacheService> serverAndCache =
+                TestingBlobUtils.createServerAndCache(tempDir);
+        try (BlobServer server = serverAndCache.f0;
+                BlobCacheService cache = serverAndCache.f1) {
             server.start();
 
             final TransientBlobKey blobKey =
                     (TransientBlobKey) put(server, jobId, data, 
TRANSIENT_BLOB);
 
-            assertTrue(server.getStorageLocation(jobId, blobKey).exists());
+            assertThat(server.getStorageLocation(jobId, 
blobKey).exists()).isTrue();

Review Comment:
   Same as above.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java:
##########
@@ -117,57 +109,57 @@ private void testDelete(@Nullable JobID jobId1, @Nullable 
JobID jobId2) throws I
 
             // put first BLOB
             TransientBlobKey key1 = (TransientBlobKey) put(server, jobId1, 
data, TRANSIENT_BLOB);
-            assertNotNull(key1);
+            assertThat(key1).isNotNull();
 
             // put two more BLOBs (same key, other key) for another job ID
             TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, 
data, TRANSIENT_BLOB);
-            assertNotNull(key2a);
+            assertThat(key2a).isNotNull();
             BlobKeyTest.verifyKeyDifferentHashEquals(key1, key2a);
             TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, 
data2, TRANSIENT_BLOB);
-            assertNotNull(key2b);
+            assertThat(key2b).isNotNull();
             BlobKeyTest.verifyKeyDifferentHashDifferent(key1, key2b);
 
             // issue a DELETE request
-            assertTrue(delete(cache, jobId1, key1));
+            assertThat(delete(cache, jobId1, key1)).isTrue();
 
             // delete only works on local cache!
-            assertTrue(server.getStorageLocation(jobId1, key1).exists());
+            assertThat(server.getStorageLocation(jobId1, 
key1).exists()).isTrue();
             // delete on server so that the cache cannot re-download
-            assertTrue(server.deleteInternal(jobId1, key1));
-            assertFalse(server.getStorageLocation(jobId1, key1).exists());
+            assertThat(server.deleteInternal(jobId1, key1)).isTrue();
+            assertThat(server.getStorageLocation(jobId1, 
key1).exists()).isFalse();

Review Comment:
   ```suggestion
               assertThat(server.getStorageLocation(jobId1, 
key1)).doesNotExist();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerSslTest.java:
##########
@@ -20,22 +20,22 @@
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.File;
 import java.io.IOException;
 
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
-import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Testing a {@link BlobServer} would fail with improper SSL config. */
-public class BlobServerSSLTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java:
##########
@@ -117,57 +109,57 @@ private void testDelete(@Nullable JobID jobId1, @Nullable 
JobID jobId2) throws I
 
             // put first BLOB
             TransientBlobKey key1 = (TransientBlobKey) put(server, jobId1, 
data, TRANSIENT_BLOB);
-            assertNotNull(key1);
+            assertThat(key1).isNotNull();
 
             // put two more BLOBs (same key, other key) for another job ID
             TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, 
data, TRANSIENT_BLOB);
-            assertNotNull(key2a);
+            assertThat(key2a).isNotNull();
             BlobKeyTest.verifyKeyDifferentHashEquals(key1, key2a);
             TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, 
data2, TRANSIENT_BLOB);
-            assertNotNull(key2b);
+            assertThat(key2b).isNotNull();
             BlobKeyTest.verifyKeyDifferentHashDifferent(key1, key2b);
 
             // issue a DELETE request
-            assertTrue(delete(cache, jobId1, key1));
+            assertThat(delete(cache, jobId1, key1)).isTrue();
 
             // delete only works on local cache!
-            assertTrue(server.getStorageLocation(jobId1, key1).exists());
+            assertThat(server.getStorageLocation(jobId1, 
key1).exists()).isTrue();
             // delete on server so that the cache cannot re-download
-            assertTrue(server.deleteInternal(jobId1, key1));
-            assertFalse(server.getStorageLocation(jobId1, key1).exists());
+            assertThat(server.deleteInternal(jobId1, key1)).isTrue();
+            assertThat(server.getStorageLocation(jobId1, 
key1).exists()).isFalse();
             verifyDeleted(cache, jobId1, key1);
             // deleting one BLOB should not affect another BLOB with a 
different key
             // (and keys are always different now)
             verifyContents(server, jobId2, key2a, data);
             verifyContents(server, jobId2, key2b, data2);
 
             // delete first file of second job
-            assertTrue(delete(cache, jobId2, key2a));
+            assertThat(delete(cache, jobId2, key2a)).isTrue();
             // delete only works on local cache
-            assertTrue(server.getStorageLocation(jobId2, key2a).exists());
+            assertThat(server.getStorageLocation(jobId2, 
key2a).exists()).isTrue();
             // delete on server so that the cache cannot re-download
-            assertTrue(server.deleteInternal(jobId2, key2a));
+            assertThat(server.deleteInternal(jobId2, key2a)).isTrue();
             verifyDeleted(cache, jobId2, key2a);
             verifyContents(server, jobId2, key2b, data2);
 
             // delete second file of second job
-            assertTrue(delete(cache, jobId2, key2b));
+            assertThat(delete(cache, jobId2, key2b)).isTrue();
             // delete only works on local cache
-            assertTrue(server.getStorageLocation(jobId2, key2b).exists());
+            assertThat(server.getStorageLocation(jobId2, 
key2b).exists()).isTrue();

Review Comment:
   Same as above.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java:
##########
@@ -239,45 +222,42 @@ private void testGetFailsIncoming(@Nullable final JobID 
jobId, BlobKey.BlobType
                 tempFileDir =
                         
cache.getTransientBlobService().createTemporaryFilename().getParentFile();
             }
-            assertTrue(tempFileDir.setExecutable(true, false));
-            assertTrue(tempFileDir.setReadable(true, false));
-            assertTrue(tempFileDir.setWritable(false, false));
-
-            // request the file from the server via the cache
-            exception.expect(IOException.class);
-            exception.expectMessage("Failed to fetch BLOB ");
+            assertThat(tempFileDir.setExecutable(true, false)).isTrue();
+            assertThat(tempFileDir.setReadable(true, false)).isTrue();
+            assertThat(tempFileDir.setWritable(false, false)).isTrue();
 
             try {
-                get(cache, jobId, blobKey);
+                assertThatThrownBy(() -> get(cache, jobId, blobKey))
+                        .isInstanceOf(IOException.class)
+                        .hasMessageStartingWith("Failed to fetch BLOB");
             } finally {
                 HashSet<String> expectedDirs = new HashSet<>();
                 expectedDirs.add("incoming");
+
                 if (jobId != null) {
                     // only the incoming and job directory should exist (no 
job directory!)
                     expectedDirs.add(JOB_DIR_PREFIX + jobId);
-                    File storageDir = tempFileDir.getParentFile();
-                    String[] actualDirs = storageDir.list();
-                    assertNotNull(actualDirs);
-                    assertEquals(expectedDirs, new 
HashSet<>(Arrays.asList(actualDirs)));
 
                     // job directory should be empty
                     File jobDir = new File(tempFileDir.getParentFile(), 
JOB_DIR_PREFIX + jobId);
-                    assertArrayEquals(new String[] {}, jobDir.list());
+                    assertThat(jobDir.list()).isEmpty();
                 } else {
                     // only the incoming and no_job directory should exist (no 
job directory!)
                     expectedDirs.add(NO_JOB_DIR_PREFIX);
-                    File storageDir = tempFileDir.getParentFile();
-                    String[] actualDirs = storageDir.list();
-                    assertNotNull(actualDirs);
-                    assertEquals(expectedDirs, new 
HashSet<>(Arrays.asList(actualDirs)));
 
                     // no_job directory should be empty
                     File noJobDir = new File(tempFileDir.getParentFile(), 
NO_JOB_DIR_PREFIX);
-                    assertArrayEquals(new String[] {}, noJobDir.list());
+                    assertThat(noJobDir.list()).isEmpty();
                 }
 
+                File storageDir = tempFileDir.getParentFile();
+                String[] actualDirs = storageDir.list();
+                assertThat(actualDirs).isNotNull();
+                assertThat(actualDirs).isNotEmpty();
+                assertThat(new 
HashSet<>(Arrays.asList(actualDirs))).isEqualTo(expectedDirs);
+
                 // file should still be there on the server (even if transient)
-                assertTrue(server.getStorageLocation(jobId, blobKey).exists());
+                assertThat(server.getStorageLocation(jobId, 
blobKey).exists()).isTrue();

Review Comment:
   ```suggestion
                   assertThat(server.getStorageLocation(jobId, 
blobKey)).exists();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java:
##########
@@ -362,7 +343,7 @@ private void testConcurrentDeleteOperations(@Nullable final 
JobID jobId)
             waitFuture.get();
 
             // delete only works on local cache!
-            assertTrue(server.getStorageLocation(jobId, blobKey).exists());
+            assertThat(server.getStorageLocation(jobId, 
blobKey).exists()).isTrue();

Review Comment:
   ```suggestion
               assertThat(server.getStorageLocation(jobId, blobKey)).exists();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java:
##########
@@ -79,18 +77,17 @@
  * Tests for successful and failing PUT operations against the BLOB server, 
and successful GET
  * operations.
  */
-public class BlobCachePutTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java:
##########
@@ -315,35 +298,33 @@ private void testConcurrentDeleteOperations(@Nullable 
final JobID jobId)
 
         final byte[] data = {1, 2, 3};
 
-        try (BlobServer server =
-                        new BlobServer(config, temporaryFolder.newFolder(), 
new VoidBlobStore());
-                BlobCacheService cache =
-                        new BlobCacheService(
-                                config,
-                                temporaryFolder.newFolder(),
-                                new VoidBlobStore(),
-                                new InetSocketAddress("localhost", 
server.getPort()))) {
-
+        Tuple2<BlobServer, BlobCacheService> serverAndCache =
+                TestingBlobUtils.createServerAndCache(tempDir);
+        try (BlobServer server = serverAndCache.f0;
+                BlobCacheService cache = serverAndCache.f1) {
             server.start();
 
             final TransientBlobKey blobKey =
                     (TransientBlobKey) put(server, jobId, data, 
TRANSIENT_BLOB);
 
-            assertTrue(server.getStorageLocation(jobId, blobKey).exists());
+            assertThat(server.getStorageLocation(jobId, 
blobKey).exists()).isTrue();
 
             for (int i = 0; i < concurrentDeleteOperations; i++) {
                 CompletableFuture<Void> deleteFuture =
                         CompletableFuture.supplyAsync(
                                 () -> {
                                     try {
-                                        assertTrue(delete(cache, jobId, 
blobKey));
-                                        assertFalse(
-                                                cache.getTransientBlobService()
-                                                        
.getStorageLocation(jobId, blobKey)
-                                                        .exists());
+                                        assertThat(delete(cache, jobId, 
blobKey)).isTrue();
+                                        assertThat(
+                                                        
cache.getTransientBlobService()
+                                                                
.getStorageLocation(jobId, blobKey)
+                                                                .exists())
+                                                .isFalse();

Review Comment:
   ```suggestion
                                           
assertThat(cache.getTransientBlobService()
                                                                   
.getStorageLocation(jobId, blobKey))
                                                   .doesNotExist();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java:
##########
@@ -384,59 +359,53 @@ private void testGetFailsStore(@Nullable final JobID 
jobId, BlobKey.BlobType blo
      * the file. File transfers should fail.
      */
     @Test
-    public void testGetFailsHaStoreForJobHa() throws IOException {
+    void testGetFailsHaStoreForJobHa() throws IOException {
         final JobID jobId = new JobID();
 
-        final Configuration config = new Configuration();
-        try (BlobServer server =
-                        new BlobServer(config, temporaryFolder.newFolder(), 
new VoidBlobStore());
-                BlobCacheService cache =
-                        new BlobCacheService(
-                                config,
-                                temporaryFolder.newFolder(),
-                                new VoidBlobStore(),
-                                new InetSocketAddress("localhost", 
server.getPort()))) {
+        Tuple2<BlobServer, BlobCacheService> serverAndCache =
+                TestingBlobUtils.createServerAndCache(tempDir);
 
+        try (BlobServer server = serverAndCache.f0;
+                BlobCacheService cache = serverAndCache.f1) {
             server.start();
 
             // store the data on the server (and blobStore), remove from local 
server store
             byte[] data = new byte[2000000];
             rnd.nextBytes(data);
             PermanentBlobKey blobKey = (PermanentBlobKey) put(server, jobId, 
data, PERMANENT_BLOB);
-            assertTrue(server.getStorageLocation(jobId, blobKey).delete());
+            assertThat(server.getStorageLocation(jobId, 
blobKey).delete()).isTrue();
 
             File tempFileDir = 
server.createTemporaryFilename().getParentFile();
 
-            // request the file from the server via the cache
-            exception.expect(IOException.class);
-            exception.expectMessage("Failed to fetch BLOB ");
-
             try {
-                get(cache, jobId, blobKey);
+                assertThatThrownBy(() -> get(cache, jobId, blobKey))
+                        .isInstanceOf(IOException.class)
+                        .hasMessageStartingWith("Failed to fetch BLOB");
             } finally {
                 HashSet<String> expectedDirs = new HashSet<>();
                 expectedDirs.add("incoming");
                 expectedDirs.add(JOB_DIR_PREFIX + jobId);
                 // only the incoming and job directory should exist (no job 
directory!)
                 File storageDir = tempFileDir.getParentFile();
                 String[] actualDirs = storageDir.list();
-                assertNotNull(actualDirs);
-                assertEquals(expectedDirs, new 
HashSet<>(Arrays.asList(actualDirs)));
+                assertThat(actualDirs).isNotNull();
+                assertThat(actualDirs).isNotEmpty();
+                assertThat(new 
HashSet<>(Arrays.asList(actualDirs))).isEqualTo(expectedDirs);

Review Comment:
   ```suggestion
                   assertThat(actualDirs).containsOnlyElementsOf(expectedDirs);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java:
##########
@@ -21,31 +21,23 @@
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.StringUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** This class contains unit tests for the {@link BlobKey} class. */
-public final class BlobKeyTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java:
##########
@@ -345,29 +322,27 @@ private void testGetFailsStore(@Nullable final JobID 
jobId, BlobKey.BlobType blo
                                 .getStorageLocation(jobId, new 
TransientBlobKey())
                                 .getParentFile();
             }
-            assertTrue(jobStoreDir.setExecutable(true, false));
-            assertTrue(jobStoreDir.setReadable(true, false));
-            assertTrue(jobStoreDir.setWritable(false, false));
-
-            // request the file from the server via the cache
-            exception.expect(AccessDeniedException.class);
+            assertThat(jobStoreDir.setExecutable(true, false)).isTrue();
+            assertThat(jobStoreDir.setReadable(true, false)).isTrue();
+            assertThat(jobStoreDir.setWritable(false, false)).isTrue();
 
             try {
-                get(cache, jobId, blobKey);
+                assertThatThrownBy(() -> get(cache, jobId, blobKey))
+                        .isInstanceOf(AccessDeniedException.class);
             } finally {
                 // there should be no remaining incoming files
                 File incomingFileDir = new File(jobStoreDir.getParent(), 
"incoming");
-                assertArrayEquals(new String[] {}, incomingFileDir.list());
+                assertThat(incomingFileDir.list()).isEmpty();
 
                 // there should be no files in the job directory
-                assertArrayEquals(new String[] {}, jobStoreDir.list());
+                assertThat(jobStoreDir.list()).isEmpty();
 
                 // if transient, the get will fail but since the download was 
successful, the file
                 // will not be on the server anymore
                 if (blobType == TRANSIENT_BLOB) {
                     verifyDeletedEventually(server, jobId, blobKey);
                 } else {
-                    assertTrue(server.getStorageLocation(jobId, 
blobKey).exists());
+                    assertThat(server.getStorageLocation(jobId, 
blobKey).exists()).isTrue();

Review Comment:
   ```suggestion
                       assertThat(server.getStorageLocation(jobId, 
blobKey)).exists();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java:
##########
@@ -18,113 +18,53 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.core.testutils.FlinkAssertions;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.apache.commons.io.FileUtils;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Random;
-
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
-import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
-import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import java.nio.file.Path;
 
 /**
  * Tests how GET requests react to corrupt files when downloaded via a {@link 
BlobServer}.
  *
  * <p>Successful GET requests are tested in conjunction wit the PUT requests.
  */
-public class BlobServerCorruptionTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java:
##########
@@ -19,36 +19,38 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.nio.file.Path;
 
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Unit tests for the blob cache retrying the connection to the server. */
-public class BlobCacheRetriesTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java:
##########
@@ -407,7 +385,7 @@ private void testConcurrentDeleteOperations(@Nullable final 
JobID jobId)
             // in case of no lock, one of the delete operations should 
eventually fail
             waitFuture.get();
 
-            assertFalse(server.getStorageLocation(jobId, blobKey).exists());
+            assertThat(server.getStorageLocation(jobId, 
blobKey).exists()).isFalse();

Review Comment:
   `doesNotExist`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java:
##########
@@ -369,24 +347,24 @@ private void testConcurrentDeleteOperations(@Nullable 
final JobID jobId)
 
         final byte[] data = {1, 2, 3};
 
-        try (final BlobServer server =
-                new BlobServer(config, temporaryFolder.newFolder(), new 
VoidBlobStore())) {
-
+        try (BlobServer server = TestingBlobUtils.createServer(tempDir)) {
             server.start();
 
             final TransientBlobKey blobKey =
                     (TransientBlobKey) put(server, jobId, data, 
TRANSIENT_BLOB);
 
-            assertTrue(server.getStorageLocation(jobId, blobKey).exists());
+            assertThat(server.getStorageLocation(jobId, 
blobKey).exists()).isTrue();
 
             for (int i = 0; i < concurrentDeleteOperations; i++) {
                 CompletableFuture<Void> deleteFuture =
                         CompletableFuture.supplyAsync(
                                 () -> {
                                     try {
-                                        assertTrue(delete(server, jobId, 
blobKey));
-                                        assertFalse(
-                                                
server.getStorageLocation(jobId, blobKey).exists());
+                                        assertThat(delete(server, jobId, 
blobKey)).isTrue();
+                                        assertThat(
+                                                        
server.getStorageLocation(jobId, blobKey)
+                                                                .exists())
+                                                .isFalse();

Review Comment:
   `doesNotExist`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java:
##########
@@ -75,29 +71,30 @@
  * <p>Successful GET requests are tested in conjunction wit the PUT requests 
by {@link
  * BlobServerPutTest}.
  */
-public class BlobServerGetTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class BlobServerGetTest {

Review Comment:
   ```suggestion
   class BlobServerGetTest {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java:
##########
@@ -20,73 +20,73 @@
 
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.net.ServerSocket;
+import java.nio.file.Path;
 
-import static org.hamcrest.CoreMatchers.allOf;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests to ensure that the BlobServer properly starts on a specified range 
of available ports. */
-public class BlobServerRangeTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheTest.java:
##########
@@ -40,10 +40,10 @@
 
 /** Test for the {@link PermanentBlobCache}. */
 @ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java:
##########
@@ -65,24 +63,19 @@
 import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /**
  * Tests for successful and failing PUT operations against the BLOB server, 
and successful GET
  * operations.
  */
-public class BlobServerPutTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsNonWritableTest.java:
##########
@@ -22,78 +22,81 @@
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** Tests for {@link BlobUtils} working on non-writable directories. */
-public class BlobUtilsNonWritableTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to