Jiabao-Sun commented on code in PR #23211: URL: https://github.com/apache/flink/pull/23211#discussion_r1298315587
########## flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java: ########## @@ -71,16 +70,16 @@ public void stringifyingResultsShouldIncorporateAccumulatorLocalValueDirectly() final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap); - assertEquals(1, results.length); + assertThat(results).isNotEmpty(); Review Comment: ```suggestion assertThat(results).hasSize(1); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java: ########## @@ -89,52 +88,50 @@ public void stringifyingResultsShouldReportNullLocalValueAsNonnullValueString() final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap); - assertEquals(1, results.length); + assertThat(results).isNotEmpty(); Review Comment: ```suggestion assertThat(results).hasSize(1); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java: ########## @@ -89,52 +88,50 @@ public void stringifyingResultsShouldReportNullLocalValueAsNonnullValueString() final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap); - assertEquals(1, results.length); + assertThat(results).isNotEmpty(); // Note the use of a String with a content of "null" rather than a null value final StringifiedAccumulatorResult firstResult = results[0]; - assertEquals(name, firstResult.getName()); - assertEquals("NullBearingAccumulator", firstResult.getType()); - assertEquals("null", firstResult.getValue()); + assertThat(firstResult.getName()).isEqualTo(name); + assertThat(firstResult.getType()).isEqualTo("NullBearingAccumulator"); + assertThat(firstResult.getValue()).isEqualTo("null"); } @Test - public void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() { + void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() { final String name = "a"; final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>(); accumulatorMap.put(name, null); final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap); - assertEquals(1, results.length); + assertThat(results).isNotEmpty(); // Note the use of String values with content of "null" rather than null values final StringifiedAccumulatorResult firstResult = results[0]; - assertEquals(name, firstResult.getName()); - assertEquals("null", firstResult.getType()); - assertEquals("null", firstResult.getValue()); + assertThat(firstResult.getName()).isEqualTo(name); + assertThat(firstResult.getType()).isEqualTo("null"); + assertThat(firstResult.getValue()).isEqualTo("null"); } @Test - public void stringifyingFailureResults() { + void stringifyingFailureResults() { final String name = "a"; final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>(); accumulatorMap.put(name, OptionalFailure.ofFailure(new FlinkRuntimeException("Test"))); final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap); - assertEquals(1, results.length); + assertThat(results).isNotEmpty(); Review Comment: ```suggestion assertThat(results).hasSize(1); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java: ########## @@ -117,57 +109,57 @@ private void testDelete(@Nullable JobID jobId1, @Nullable JobID jobId2) throws I // put first BLOB TransientBlobKey key1 = (TransientBlobKey) put(server, jobId1, data, TRANSIENT_BLOB); - assertNotNull(key1); + assertThat(key1).isNotNull(); // put two more BLOBs (same key, other key) for another job ID TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB); - assertNotNull(key2a); + assertThat(key2a).isNotNull(); BlobKeyTest.verifyKeyDifferentHashEquals(key1, key2a); TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB); - assertNotNull(key2b); + assertThat(key2b).isNotNull(); BlobKeyTest.verifyKeyDifferentHashDifferent(key1, key2b); // issue a DELETE request - assertTrue(delete(cache, jobId1, key1)); + assertThat(delete(cache, jobId1, key1)).isTrue(); // delete only works on local cache! - assertTrue(server.getStorageLocation(jobId1, key1).exists()); + assertThat(server.getStorageLocation(jobId1, key1).exists()).isTrue(); Review Comment: ```suggestion assertThat(server.getStorageLocation(jobId1, key1)).exists(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java: ########## @@ -83,31 +77,30 @@ * <p>Most successful GET requests are tested in conjunction wit the PUT requests by {@link * BlobCachePutTest}. */ -public class BlobCacheGetTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java: ########## @@ -315,35 +298,33 @@ private void testConcurrentDeleteOperations(@Nullable final JobID jobId) final byte[] data = {1, 2, 3}; - try (BlobServer server = - new BlobServer(config, temporaryFolder.newFolder(), new VoidBlobStore()); - BlobCacheService cache = - new BlobCacheService( - config, - temporaryFolder.newFolder(), - new VoidBlobStore(), - new InetSocketAddress("localhost", server.getPort()))) { - + Tuple2<BlobServer, BlobCacheService> serverAndCache = + TestingBlobUtils.createServerAndCache(tempDir); + try (BlobServer server = serverAndCache.f0; + BlobCacheService cache = serverAndCache.f1) { server.start(); final TransientBlobKey blobKey = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB); - assertTrue(server.getStorageLocation(jobId, blobKey).exists()); + assertThat(server.getStorageLocation(jobId, blobKey).exists()).isTrue(); for (int i = 0; i < concurrentDeleteOperations; i++) { CompletableFuture<Void> deleteFuture = CompletableFuture.supplyAsync( () -> { try { - assertTrue(delete(cache, jobId, blobKey)); - assertFalse( - cache.getTransientBlobService() - .getStorageLocation(jobId, blobKey) - .exists()); + assertThat(delete(cache, jobId, blobKey)).isTrue(); + assertThat( + cache.getTransientBlobService() + .getStorageLocation(jobId, blobKey) + .exists()) + .isFalse(); // delete only works on local cache! - assertTrue( - server.getStorageLocation(jobId, blobKey).exists()); + assertThat( + server.getStorageLocation(jobId, blobKey) + .exists()) + .isTrue(); Review Comment: ```suggestion assertThat(server.getStorageLocation(jobId, blobKey)) .exists(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java: ########## @@ -49,17 +48,13 @@ import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assume.assumeFalse; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; /** This class contains unit tests for the {@link BlobClient}. */ -public class BlobClientTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java: ########## @@ -18,135 +18,43 @@ package org.apache.flink.runtime.blob; -import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.Random; - -import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; -import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; -import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashDifferent; -import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals; -import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted; -import static org.apache.flink.runtime.blob.BlobServerPutTest.put; -import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents; -import static org.junit.Assert.assertTrue; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; /** Tests for the recovery of files of a {@link BlobCacheService} from a HA store. */ -public class BlobCacheRecoveryTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java: ########## @@ -58,12 +59,15 @@ import static org.apache.flink.runtime.blob.BlobServerGetTest.get; import static org.apache.flink.runtime.blob.BlobServerPutTest.put; import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents; +import static org.apache.flink.runtime.blob.TestingBlobHelpers.checkFileCountForJob; +import static org.apache.flink.runtime.blob.TestingBlobHelpers.checkFilesExist; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; /** A few tests for the cleanup of transient BLOBs at the {@link BlobServer}. */ -public class BlobServerCleanupTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java: ########## @@ -21,29 +21,28 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.blob.BlobKey.BlobType; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.ArrayList; import java.util.List; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link BlobCacheSizeTracker}. */ -public class BlobCacheSizeTrackerTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java: ########## @@ -21,57 +21,45 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.core.testutils.FlinkAssertions; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.apache.commons.io.FileUtils; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; -import javax.annotation.Nullable; - -import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.file.Files; -import java.util.Arrays; -import java.util.Random; +import java.nio.file.Path; import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; -import static org.apache.flink.runtime.blob.BlobServerGetTest.get; -import static org.apache.flink.runtime.blob.BlobServerPutTest.put; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; /** * Tests how GET requests react to corrupt files when downloaded via a {@link BlobCacheService}. * * <p>Successful GET requests are tested in conjunction wit the PUT requests. */ -public class BlobCacheCorruptionTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java: ########## @@ -117,57 +109,57 @@ private void testDelete(@Nullable JobID jobId1, @Nullable JobID jobId2) throws I // put first BLOB TransientBlobKey key1 = (TransientBlobKey) put(server, jobId1, data, TRANSIENT_BLOB); - assertNotNull(key1); + assertThat(key1).isNotNull(); // put two more BLOBs (same key, other key) for another job ID TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB); - assertNotNull(key2a); + assertThat(key2a).isNotNull(); BlobKeyTest.verifyKeyDifferentHashEquals(key1, key2a); TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB); - assertNotNull(key2b); + assertThat(key2b).isNotNull(); BlobKeyTest.verifyKeyDifferentHashDifferent(key1, key2b); // issue a DELETE request - assertTrue(delete(cache, jobId1, key1)); + assertThat(delete(cache, jobId1, key1)).isTrue(); // delete only works on local cache! - assertTrue(server.getStorageLocation(jobId1, key1).exists()); + assertThat(server.getStorageLocation(jobId1, key1).exists()).isTrue(); // delete on server so that the cache cannot re-download - assertTrue(server.deleteInternal(jobId1, key1)); - assertFalse(server.getStorageLocation(jobId1, key1).exists()); + assertThat(server.deleteInternal(jobId1, key1)).isTrue(); + assertThat(server.getStorageLocation(jobId1, key1).exists()).isFalse(); verifyDeleted(cache, jobId1, key1); // deleting one BLOB should not affect another BLOB with a different key // (and keys are always different now) verifyContents(server, jobId2, key2a, data); verifyContents(server, jobId2, key2b, data2); // delete first file of second job - assertTrue(delete(cache, jobId2, key2a)); + assertThat(delete(cache, jobId2, key2a)).isTrue(); // delete only works on local cache - assertTrue(server.getStorageLocation(jobId2, key2a).exists()); + assertThat(server.getStorageLocation(jobId2, key2a).exists()).isTrue(); Review Comment: Same as above. ########## flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java: ########## @@ -89,52 +88,50 @@ public void stringifyingResultsShouldReportNullLocalValueAsNonnullValueString() final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap); - assertEquals(1, results.length); + assertThat(results).isNotEmpty(); // Note the use of a String with a content of "null" rather than a null value final StringifiedAccumulatorResult firstResult = results[0]; - assertEquals(name, firstResult.getName()); - assertEquals("NullBearingAccumulator", firstResult.getType()); - assertEquals("null", firstResult.getValue()); + assertThat(firstResult.getName()).isEqualTo(name); + assertThat(firstResult.getType()).isEqualTo("NullBearingAccumulator"); + assertThat(firstResult.getValue()).isEqualTo("null"); } @Test - public void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() { + void stringifyingResultsShouldReportNullAccumulatorWithNonnullValueAndTypeString() { final String name = "a"; final Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = new HashMap<>(); accumulatorMap.put(name, null); final StringifiedAccumulatorResult[] results = StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap); - assertEquals(1, results.length); + assertThat(results).isNotEmpty(); Review Comment: ```suggestion assertThat(results).hasSize(1); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java: ########## @@ -176,47 +168,42 @@ public void testDeleteTransientAlreadyDeletedForJob() throws IOException { * * @param jobId job id */ - private void testDeleteTransientAlreadyDeleted(@Nullable final JobID jobId) throws IOException { - - final Configuration config = new Configuration(); - try (BlobServer server = - new BlobServer(config, temporaryFolder.newFolder(), new VoidBlobStore()); - BlobCacheService cache = - new BlobCacheService( - config, - temporaryFolder.newFolder(), - new VoidBlobStore(), - new InetSocketAddress("localhost", server.getPort()))) { + private void testDeleteTransientAlreadyDeleted(Path tempDir, @Nullable final JobID jobId) Review Comment: Why we need `Path tempDir` here? ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java: ########## @@ -75,29 +71,30 @@ * <p>Successful GET requests are tested in conjunction wit the PUT requests by {@link * BlobServerPutTest}. */ -public class BlobServerGetTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java: ########## @@ -317,20 +303,21 @@ public void testPermanentBlobDeferredCleanup() throws IOException, InterruptedEx } @Test - public void testTransientBlobNoJobCleanup() throws Exception { - testTransientBlobCleanup(null); + void testTransientBlobNoJobCleanup() throws Exception { + testTransientBlobCleanup(tempDir, null); } @Test - public void testTransientBlobForJobCleanup() throws Exception { - testTransientBlobCleanup(new JobID()); + void testTransientBlobForJobCleanup() throws Exception { + testTransientBlobCleanup(tempDir, new JobID()); } /** * Tests that {@link TransientBlobCache} cleans up after a default TTL and keeps files which are * constantly accessed. */ - private void testTransientBlobCleanup(@Nullable final JobID jobId) throws Exception { + private void testTransientBlobCleanup(final Path tempDir, @Nullable final JobID jobId) Review Comment: Why do we need to add `Path tempDir` parameter? ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java: ########## @@ -315,35 +298,33 @@ private void testConcurrentDeleteOperations(@Nullable final JobID jobId) final byte[] data = {1, 2, 3}; - try (BlobServer server = - new BlobServer(config, temporaryFolder.newFolder(), new VoidBlobStore()); - BlobCacheService cache = - new BlobCacheService( - config, - temporaryFolder.newFolder(), - new VoidBlobStore(), - new InetSocketAddress("localhost", server.getPort()))) { - + Tuple2<BlobServer, BlobCacheService> serverAndCache = + TestingBlobUtils.createServerAndCache(tempDir); + try (BlobServer server = serverAndCache.f0; + BlobCacheService cache = serverAndCache.f1) { server.start(); final TransientBlobKey blobKey = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB); - assertTrue(server.getStorageLocation(jobId, blobKey).exists()); + assertThat(server.getStorageLocation(jobId, blobKey).exists()).isTrue(); Review Comment: Same as above. ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java: ########## @@ -117,57 +109,57 @@ private void testDelete(@Nullable JobID jobId1, @Nullable JobID jobId2) throws I // put first BLOB TransientBlobKey key1 = (TransientBlobKey) put(server, jobId1, data, TRANSIENT_BLOB); - assertNotNull(key1); + assertThat(key1).isNotNull(); // put two more BLOBs (same key, other key) for another job ID TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB); - assertNotNull(key2a); + assertThat(key2a).isNotNull(); BlobKeyTest.verifyKeyDifferentHashEquals(key1, key2a); TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB); - assertNotNull(key2b); + assertThat(key2b).isNotNull(); BlobKeyTest.verifyKeyDifferentHashDifferent(key1, key2b); // issue a DELETE request - assertTrue(delete(cache, jobId1, key1)); + assertThat(delete(cache, jobId1, key1)).isTrue(); // delete only works on local cache! - assertTrue(server.getStorageLocation(jobId1, key1).exists()); + assertThat(server.getStorageLocation(jobId1, key1).exists()).isTrue(); // delete on server so that the cache cannot re-download - assertTrue(server.deleteInternal(jobId1, key1)); - assertFalse(server.getStorageLocation(jobId1, key1).exists()); + assertThat(server.deleteInternal(jobId1, key1)).isTrue(); + assertThat(server.getStorageLocation(jobId1, key1).exists()).isFalse(); Review Comment: ```suggestion assertThat(server.getStorageLocation(jobId1, key1)).doesNotExist(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerSslTest.java: ########## @@ -20,22 +20,22 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import java.io.File; import java.io.IOException; -import static org.apache.flink.util.ExceptionUtils.findThrowable; -import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Testing a {@link BlobServer} would fail with improper SSL config. */ -public class BlobServerSSLTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java: ########## @@ -117,57 +109,57 @@ private void testDelete(@Nullable JobID jobId1, @Nullable JobID jobId2) throws I // put first BLOB TransientBlobKey key1 = (TransientBlobKey) put(server, jobId1, data, TRANSIENT_BLOB); - assertNotNull(key1); + assertThat(key1).isNotNull(); // put two more BLOBs (same key, other key) for another job ID TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB); - assertNotNull(key2a); + assertThat(key2a).isNotNull(); BlobKeyTest.verifyKeyDifferentHashEquals(key1, key2a); TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB); - assertNotNull(key2b); + assertThat(key2b).isNotNull(); BlobKeyTest.verifyKeyDifferentHashDifferent(key1, key2b); // issue a DELETE request - assertTrue(delete(cache, jobId1, key1)); + assertThat(delete(cache, jobId1, key1)).isTrue(); // delete only works on local cache! - assertTrue(server.getStorageLocation(jobId1, key1).exists()); + assertThat(server.getStorageLocation(jobId1, key1).exists()).isTrue(); // delete on server so that the cache cannot re-download - assertTrue(server.deleteInternal(jobId1, key1)); - assertFalse(server.getStorageLocation(jobId1, key1).exists()); + assertThat(server.deleteInternal(jobId1, key1)).isTrue(); + assertThat(server.getStorageLocation(jobId1, key1).exists()).isFalse(); verifyDeleted(cache, jobId1, key1); // deleting one BLOB should not affect another BLOB with a different key // (and keys are always different now) verifyContents(server, jobId2, key2a, data); verifyContents(server, jobId2, key2b, data2); // delete first file of second job - assertTrue(delete(cache, jobId2, key2a)); + assertThat(delete(cache, jobId2, key2a)).isTrue(); // delete only works on local cache - assertTrue(server.getStorageLocation(jobId2, key2a).exists()); + assertThat(server.getStorageLocation(jobId2, key2a).exists()).isTrue(); // delete on server so that the cache cannot re-download - assertTrue(server.deleteInternal(jobId2, key2a)); + assertThat(server.deleteInternal(jobId2, key2a)).isTrue(); verifyDeleted(cache, jobId2, key2a); verifyContents(server, jobId2, key2b, data2); // delete second file of second job - assertTrue(delete(cache, jobId2, key2b)); + assertThat(delete(cache, jobId2, key2b)).isTrue(); // delete only works on local cache - assertTrue(server.getStorageLocation(jobId2, key2b).exists()); + assertThat(server.getStorageLocation(jobId2, key2b).exists()).isTrue(); Review Comment: Same as above. ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java: ########## @@ -239,45 +222,42 @@ private void testGetFailsIncoming(@Nullable final JobID jobId, BlobKey.BlobType tempFileDir = cache.getTransientBlobService().createTemporaryFilename().getParentFile(); } - assertTrue(tempFileDir.setExecutable(true, false)); - assertTrue(tempFileDir.setReadable(true, false)); - assertTrue(tempFileDir.setWritable(false, false)); - - // request the file from the server via the cache - exception.expect(IOException.class); - exception.expectMessage("Failed to fetch BLOB "); + assertThat(tempFileDir.setExecutable(true, false)).isTrue(); + assertThat(tempFileDir.setReadable(true, false)).isTrue(); + assertThat(tempFileDir.setWritable(false, false)).isTrue(); try { - get(cache, jobId, blobKey); + assertThatThrownBy(() -> get(cache, jobId, blobKey)) + .isInstanceOf(IOException.class) + .hasMessageStartingWith("Failed to fetch BLOB"); } finally { HashSet<String> expectedDirs = new HashSet<>(); expectedDirs.add("incoming"); + if (jobId != null) { // only the incoming and job directory should exist (no job directory!) expectedDirs.add(JOB_DIR_PREFIX + jobId); - File storageDir = tempFileDir.getParentFile(); - String[] actualDirs = storageDir.list(); - assertNotNull(actualDirs); - assertEquals(expectedDirs, new HashSet<>(Arrays.asList(actualDirs))); // job directory should be empty File jobDir = new File(tempFileDir.getParentFile(), JOB_DIR_PREFIX + jobId); - assertArrayEquals(new String[] {}, jobDir.list()); + assertThat(jobDir.list()).isEmpty(); } else { // only the incoming and no_job directory should exist (no job directory!) expectedDirs.add(NO_JOB_DIR_PREFIX); - File storageDir = tempFileDir.getParentFile(); - String[] actualDirs = storageDir.list(); - assertNotNull(actualDirs); - assertEquals(expectedDirs, new HashSet<>(Arrays.asList(actualDirs))); // no_job directory should be empty File noJobDir = new File(tempFileDir.getParentFile(), NO_JOB_DIR_PREFIX); - assertArrayEquals(new String[] {}, noJobDir.list()); + assertThat(noJobDir.list()).isEmpty(); } + File storageDir = tempFileDir.getParentFile(); + String[] actualDirs = storageDir.list(); + assertThat(actualDirs).isNotNull(); + assertThat(actualDirs).isNotEmpty(); + assertThat(new HashSet<>(Arrays.asList(actualDirs))).isEqualTo(expectedDirs); + // file should still be there on the server (even if transient) - assertTrue(server.getStorageLocation(jobId, blobKey).exists()); + assertThat(server.getStorageLocation(jobId, blobKey).exists()).isTrue(); Review Comment: ```suggestion assertThat(server.getStorageLocation(jobId, blobKey)).exists(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java: ########## @@ -362,7 +343,7 @@ private void testConcurrentDeleteOperations(@Nullable final JobID jobId) waitFuture.get(); // delete only works on local cache! - assertTrue(server.getStorageLocation(jobId, blobKey).exists()); + assertThat(server.getStorageLocation(jobId, blobKey).exists()).isTrue(); Review Comment: ```suggestion assertThat(server.getStorageLocation(jobId, blobKey)).exists(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java: ########## @@ -79,18 +77,17 @@ * Tests for successful and failing PUT operations against the BLOB server, and successful GET * operations. */ -public class BlobCachePutTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java: ########## @@ -315,35 +298,33 @@ private void testConcurrentDeleteOperations(@Nullable final JobID jobId) final byte[] data = {1, 2, 3}; - try (BlobServer server = - new BlobServer(config, temporaryFolder.newFolder(), new VoidBlobStore()); - BlobCacheService cache = - new BlobCacheService( - config, - temporaryFolder.newFolder(), - new VoidBlobStore(), - new InetSocketAddress("localhost", server.getPort()))) { - + Tuple2<BlobServer, BlobCacheService> serverAndCache = + TestingBlobUtils.createServerAndCache(tempDir); + try (BlobServer server = serverAndCache.f0; + BlobCacheService cache = serverAndCache.f1) { server.start(); final TransientBlobKey blobKey = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB); - assertTrue(server.getStorageLocation(jobId, blobKey).exists()); + assertThat(server.getStorageLocation(jobId, blobKey).exists()).isTrue(); for (int i = 0; i < concurrentDeleteOperations; i++) { CompletableFuture<Void> deleteFuture = CompletableFuture.supplyAsync( () -> { try { - assertTrue(delete(cache, jobId, blobKey)); - assertFalse( - cache.getTransientBlobService() - .getStorageLocation(jobId, blobKey) - .exists()); + assertThat(delete(cache, jobId, blobKey)).isTrue(); + assertThat( + cache.getTransientBlobService() + .getStorageLocation(jobId, blobKey) + .exists()) + .isFalse(); Review Comment: ```suggestion assertThat(cache.getTransientBlobService() .getStorageLocation(jobId, blobKey)) .doesNotExist(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java: ########## @@ -384,59 +359,53 @@ private void testGetFailsStore(@Nullable final JobID jobId, BlobKey.BlobType blo * the file. File transfers should fail. */ @Test - public void testGetFailsHaStoreForJobHa() throws IOException { + void testGetFailsHaStoreForJobHa() throws IOException { final JobID jobId = new JobID(); - final Configuration config = new Configuration(); - try (BlobServer server = - new BlobServer(config, temporaryFolder.newFolder(), new VoidBlobStore()); - BlobCacheService cache = - new BlobCacheService( - config, - temporaryFolder.newFolder(), - new VoidBlobStore(), - new InetSocketAddress("localhost", server.getPort()))) { + Tuple2<BlobServer, BlobCacheService> serverAndCache = + TestingBlobUtils.createServerAndCache(tempDir); + try (BlobServer server = serverAndCache.f0; + BlobCacheService cache = serverAndCache.f1) { server.start(); // store the data on the server (and blobStore), remove from local server store byte[] data = new byte[2000000]; rnd.nextBytes(data); PermanentBlobKey blobKey = (PermanentBlobKey) put(server, jobId, data, PERMANENT_BLOB); - assertTrue(server.getStorageLocation(jobId, blobKey).delete()); + assertThat(server.getStorageLocation(jobId, blobKey).delete()).isTrue(); File tempFileDir = server.createTemporaryFilename().getParentFile(); - // request the file from the server via the cache - exception.expect(IOException.class); - exception.expectMessage("Failed to fetch BLOB "); - try { - get(cache, jobId, blobKey); + assertThatThrownBy(() -> get(cache, jobId, blobKey)) + .isInstanceOf(IOException.class) + .hasMessageStartingWith("Failed to fetch BLOB"); } finally { HashSet<String> expectedDirs = new HashSet<>(); expectedDirs.add("incoming"); expectedDirs.add(JOB_DIR_PREFIX + jobId); // only the incoming and job directory should exist (no job directory!) File storageDir = tempFileDir.getParentFile(); String[] actualDirs = storageDir.list(); - assertNotNull(actualDirs); - assertEquals(expectedDirs, new HashSet<>(Arrays.asList(actualDirs))); + assertThat(actualDirs).isNotNull(); + assertThat(actualDirs).isNotEmpty(); + assertThat(new HashSet<>(Arrays.asList(actualDirs))).isEqualTo(expectedDirs); Review Comment: ```suggestion assertThat(actualDirs).containsOnlyElementsOf(expectedDirs); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java: ########## @@ -21,31 +21,23 @@ import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.util.AbstractID; import org.apache.flink.util.StringUtils; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.lessThan; -import static org.hamcrest.core.IsNot.not; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** This class contains unit tests for the {@link BlobKey} class. */ -public final class BlobKeyTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java: ########## @@ -345,29 +322,27 @@ private void testGetFailsStore(@Nullable final JobID jobId, BlobKey.BlobType blo .getStorageLocation(jobId, new TransientBlobKey()) .getParentFile(); } - assertTrue(jobStoreDir.setExecutable(true, false)); - assertTrue(jobStoreDir.setReadable(true, false)); - assertTrue(jobStoreDir.setWritable(false, false)); - - // request the file from the server via the cache - exception.expect(AccessDeniedException.class); + assertThat(jobStoreDir.setExecutable(true, false)).isTrue(); + assertThat(jobStoreDir.setReadable(true, false)).isTrue(); + assertThat(jobStoreDir.setWritable(false, false)).isTrue(); try { - get(cache, jobId, blobKey); + assertThatThrownBy(() -> get(cache, jobId, blobKey)) + .isInstanceOf(AccessDeniedException.class); } finally { // there should be no remaining incoming files File incomingFileDir = new File(jobStoreDir.getParent(), "incoming"); - assertArrayEquals(new String[] {}, incomingFileDir.list()); + assertThat(incomingFileDir.list()).isEmpty(); // there should be no files in the job directory - assertArrayEquals(new String[] {}, jobStoreDir.list()); + assertThat(jobStoreDir.list()).isEmpty(); // if transient, the get will fail but since the download was successful, the file // will not be on the server anymore if (blobType == TRANSIENT_BLOB) { verifyDeletedEventually(server, jobId, blobKey); } else { - assertTrue(server.getStorageLocation(jobId, blobKey).exists()); + assertThat(server.getStorageLocation(jobId, blobKey).exists()).isTrue(); Review Comment: ```suggestion assertThat(server.getStorageLocation(jobId, blobKey)).exists(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java: ########## @@ -18,113 +18,53 @@ package org.apache.flink.runtime.blob; -import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.core.testutils.FlinkAssertions; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.apache.commons.io.FileUtils; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; -import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.util.Arrays; -import java.util.Random; - -import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; -import static org.apache.flink.runtime.blob.BlobServerGetTest.get; -import static org.apache.flink.runtime.blob.BlobServerPutTest.put; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import java.nio.file.Path; /** * Tests how GET requests react to corrupt files when downloaded via a {@link BlobServer}. * * <p>Successful GET requests are tested in conjunction wit the PUT requests. */ -public class BlobServerCorruptionTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java: ########## @@ -19,36 +19,38 @@ package org.apache.flink.runtime.blob; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; import javax.annotation.Nullable; import java.io.IOException; -import java.net.InetSocketAddress; +import java.nio.file.Path; import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; import static org.apache.flink.runtime.blob.BlobServerPutTest.put; import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Unit tests for the blob cache retrying the connection to the server. */ -public class BlobCacheRetriesTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java: ########## @@ -407,7 +385,7 @@ private void testConcurrentDeleteOperations(@Nullable final JobID jobId) // in case of no lock, one of the delete operations should eventually fail waitFuture.get(); - assertFalse(server.getStorageLocation(jobId, blobKey).exists()); + assertThat(server.getStorageLocation(jobId, blobKey).exists()).isFalse(); Review Comment: `doesNotExist` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java: ########## @@ -369,24 +347,24 @@ private void testConcurrentDeleteOperations(@Nullable final JobID jobId) final byte[] data = {1, 2, 3}; - try (final BlobServer server = - new BlobServer(config, temporaryFolder.newFolder(), new VoidBlobStore())) { - + try (BlobServer server = TestingBlobUtils.createServer(tempDir)) { server.start(); final TransientBlobKey blobKey = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB); - assertTrue(server.getStorageLocation(jobId, blobKey).exists()); + assertThat(server.getStorageLocation(jobId, blobKey).exists()).isTrue(); for (int i = 0; i < concurrentDeleteOperations; i++) { CompletableFuture<Void> deleteFuture = CompletableFuture.supplyAsync( () -> { try { - assertTrue(delete(server, jobId, blobKey)); - assertFalse( - server.getStorageLocation(jobId, blobKey).exists()); + assertThat(delete(server, jobId, blobKey)).isTrue(); + assertThat( + server.getStorageLocation(jobId, blobKey) + .exists()) + .isFalse(); Review Comment: `doesNotExist` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java: ########## @@ -75,29 +71,30 @@ * <p>Successful GET requests are tested in conjunction wit the PUT requests by {@link * BlobServerPutTest}. */ -public class BlobServerGetTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) +public class BlobServerGetTest { Review Comment: ```suggestion class BlobServerGetTest { ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java: ########## @@ -20,73 +20,73 @@ import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.net.ServerSocket; +import java.nio.file.Path; -import static org.hamcrest.CoreMatchers.allOf; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; /** Tests to ensure that the BlobServer properly starts on a specified range of available ports. */ -public class BlobServerRangeTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheTest.java: ########## @@ -40,10 +40,10 @@ /** Test for the {@link PermanentBlobCache}. */ @ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java: ########## @@ -65,24 +63,19 @@ import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals; import static org.apache.flink.runtime.blob.BlobServerGetTest.get; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assume.assumeTrue; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; /** * Tests for successful and failing PUT operations against the BLOB server, and successful GET * operations. */ -public class BlobServerPutTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsNonWritableTest.java: ########## @@ -22,78 +22,81 @@ import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.OperatingSystem; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; +import java.nio.file.Path; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; /** Tests for {@link BlobUtils} working on non-writable directories. */ -public class BlobUtilsNonWritableTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) Review Comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org