Jiabao-Sun commented on code in PR #24881:
URL: https://github.com/apache/flink/pull/24881#discussion_r1626824660


##########
flink-core/src/test/java/org/apache/flink/core/fs/AutoCloseableRegistryTest.java:
##########
@@ -66,9 +64,9 @@ public void testSuppressedExceptions() throws Exception {
 
             fail("Close should throw exception");
         } catch (Exception ex) {
-            assertEquals("1", ex.getMessage());
-            assertEquals("2", ex.getSuppressed()[0].getMessage());
-            assertEquals("java.lang.AssertionError: 3", 
ex.getSuppressed()[1].getMessage());
+            assertThat(ex.getMessage()).isEqualTo("1");
+            assertThat(ex.getSuppressed()[0].getMessage()).isEqualTo("2");
+            
assertThat(ex.getSuppressed()[1].getMessage()).isEqualTo("java.lang.AssertionError:
 3");

Review Comment:
   ```java
   assertThatThrownBy(autoCloseableRegistry::close)
                   .hasMessage("1")
                   .satisfies(
                           e -> 
assertThat(e.getSuppressed()[0]).hasMessage("2"),
                           e -> 
assertThat(e.getSuppressed()[1]).hasMessage("java.lang.AssertionError: 3"));
   ```



##########
flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java:
##########
@@ -156,37 +151,38 @@ public void testLocalFilesystem() throws Exception {
 
         testbytestest = new byte[5];
         final FSDataInputStream lfsinput2 = lfs.open(pathtotestfile2);
-        assertEquals(lfsinput2.read(testbytestest), 5);
+        assertThat(lfsinput2.read(testbytestest)).isEqualTo(5);
         lfsinput2.close();
-        assertTrue(Arrays.equals(testbytes, testbytestest));
+        assertThat(testbytestest).containsExactly(testbytes);
 
         // does lfs see two files?
-        assertEquals(lfs.listStatus(pathtotmpdir).length, 2);
+        assertThat(lfs.listStatus(pathtotmpdir)).hasSize(2);
 
         // do we get exactly one blocklocation per file? no matter what start 
and len we provide
-        
assertEquals(lfs.getFileBlockLocations(lfs.getFileStatus(pathtotestfile1), 0, 
0).length, 1);
+        
assertThat(lfs.getFileBlockLocations(lfs.getFileStatus(pathtotestfile1), 0, 
0).length)
+                .isOne();
 
         /*
          * can lfs delete files / directories?
          */
-        assertTrue(lfs.delete(pathtotestfile1, false));
+        assertThat(lfs.delete(pathtotestfile1, false)).isTrue();
 
         // and can lfs also delete directories recursively?
-        assertTrue(lfs.delete(pathtotmpdir, true));
+        assertThat(lfs.delete(pathtotmpdir, true)).isTrue();
 
-        assertTrue(!tempdir.exists());
+        assertThat(tempdir.exists()).isFalse();

Review Comment:
   ```suggestion
           assertThat(tempdir).doesNotExist();
   ```



##########
flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobRecoverableWriterTest.java:
##########
@@ -46,15 +43,11 @@ public class AzureBlobRecoverableWriterTest extends 
AbstractRecoverableWriterTes
     private static final String ACCESS_KEY = 
System.getenv("ARTIFACTS_AZURE_ACCESS_KEY");
     private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
 
-    @BeforeClass
+    @BeforeAll
     public static void checkCredentialsAndSetup() throws IOException {

Review Comment:
   ```suggestion
       static void checkCredentialsAndSetup() throws IOException {
   ```



##########
flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPosTest.java:
##########
@@ -20,90 +20,91 @@
 
 import org.apache.flink.configuration.ConfigConstants;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
 /** Tests for {@link ByteArrayOutputStreamWithPos}. */
-public class ByteArrayOutputStreamWithPosTest {
+class ByteArrayOutputStreamWithPosTest {
 
     private static final int BUFFER_SIZE = 32;
 
-    @Rule public ExpectedException thrown = ExpectedException.none();
-
     private ByteArrayOutputStreamWithPos stream;
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    void setup() {
         stream = new ByteArrayOutputStreamWithPos(BUFFER_SIZE);
     }
 
     /** Test setting position which is exactly the same with the buffer size. 
*/
     @Test
-    public void testSetPositionWhenBufferIsFull() throws Exception {
+    void testSetPositionWhenBufferIsFull() throws Exception {
         stream.write(new byte[BUFFER_SIZE]);
 
         // check whether the buffer is filled fully
-        Assert.assertEquals(BUFFER_SIZE, stream.getBuf().length);
+        assertThat(stream.getBuf()).hasSize(BUFFER_SIZE);
 
         // check current position is the end of the buffer
-        Assert.assertEquals(BUFFER_SIZE, stream.getPosition());
+        assertThat(stream.getPosition()).isEqualTo(BUFFER_SIZE);
 
         stream.setPosition(BUFFER_SIZE);
 
         // confirm current position is at where we expect.
-        Assert.assertEquals(BUFFER_SIZE, stream.getPosition());
+        assertThat(stream.getPosition()).isEqualTo(BUFFER_SIZE);
     }
 
     /** Test setting negative position. */
     @Test
-    public void testSetNegativePosition() throws Exception {
-        stream.write(new byte[BUFFER_SIZE]);
-
-        thrown.expect(IllegalArgumentException.class);
-        thrown.expectMessage("Position out of bounds");
-        stream.setPosition(-1);
+    void testSetNegativePosition() {
+        Throwable exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> {
+                            stream.write(new byte[BUFFER_SIZE]);
+                            stream.setPosition(-1);
+                        });
+        assertThat(exception.getMessage()).contains("Position out of bounds");

Review Comment:
   assertThatThrownBy is preferred.



##########
flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterTest.java:
##########
@@ -46,21 +47,19 @@ public class HadoopRecoverableWriterTest extends 
AbstractRecoverableWriterTest {
 
     private static Path basePath;
 
-    @BeforeClass
+    @BeforeAll
     public static void testHadoopVersion() {

Review Comment:
   ```suggestion
      static void testHadoopVersion() {
   ```



##########
flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java:
##########
@@ -200,61 +196,63 @@ public void testRenamePath() throws IOException {
         FileSystem fs = FileSystem.getLocalFileSystem();
 
         // pre-conditions: /root/src/B exists but /root/dst/B does not
-        assertTrue(fs.exists(srcDirPath));
-        assertFalse(fs.exists(destDirPath));
+        assertThat(fs.exists(srcDirPath)).isTrue();
+        assertThat(fs.exists(destDirPath)).isFalse();
 
         // do the move/rename: /root/src/B -> /root/dst/
-        assertTrue(fs.rename(srcDirPath, destDirPath));
+        assertThat(fs.rename(srcDirPath, destDirPath)).isTrue();
 
         // post-conditions: /root/src/B doesn't exists, /root/dst/B/test.csv 
has been created
-        assertTrue(fs.exists(destFilePath));
-        assertFalse(fs.exists(srcDirPath));
+        assertThat(fs.exists(destFilePath)).isTrue();
+        assertThat(fs.exists(srcDirPath)).isFalse();
 
         // re-create source file and test overwrite
-        assertTrue(srcDirectory.mkdirs());
-        assertTrue(srcFile.createNewFile());
+        assertThat(srcDirectory.mkdirs()).isTrue();
+        assertThat(srcFile.createNewFile()).isTrue();
 
         // overwrite the destination file
-        assertTrue(fs.rename(srcFilePath, destFilePath));
+        assertThat(fs.rename(srcFilePath, destFilePath)).isTrue();
 
         // post-conditions: now only the src file has been moved
-        assertFalse(fs.exists(srcFilePath));
-        assertTrue(fs.exists(srcDirPath));
-        assertTrue(fs.exists(destFilePath));
+        assertThat(fs.exists(srcFilePath)).isFalse();
+        assertThat(fs.exists(srcDirPath)).isTrue();
+        assertThat(fs.exists(destFilePath)).isTrue();
     }
 
     @Test
-    public void testRenameNonExistingFile() throws IOException {
+    void testRenameNonExistingFile() throws IOException {
         final FileSystem fs = FileSystem.getLocalFileSystem();
 
-        final File srcFile = new File(temporaryFolder.newFolder(), 
"someFile.txt");
-        final File destFile = new File(temporaryFolder.newFolder(), "target");
+        File tmpDir = TempDirUtils.newFolder(tempFolder);
+        final File srcFile = new File(tmpDir, "someFile.txt");
+        final File destFile = new File(tmpDir, "target");
 
         final Path srcFilePath = new Path(srcFile.toURI());
         final Path destFilePath = new Path(destFile.toURI());
 
         // this cannot succeed because the source file does not exist
-        assertFalse(fs.rename(srcFilePath, destFilePath));
+        assertThat(fs.rename(srcFilePath, destFilePath)).isFalse();
     }
 
     @Test
-    @Category(FailsInGHAContainerWithRootUser.class)
-    public void testRenameFileWithNoAccess() throws IOException {
+    @Tag("FailsInGHAContainerWithRootUser")
+    @Disabled
+    void testRenameFileWithNoAccess() throws IOException {
         final FileSystem fs = FileSystem.getLocalFileSystem();
 
-        final File srcFile = temporaryFolder.newFile("someFile.txt");
-        final File destFile = new File(temporaryFolder.newFolder(), "target");
+        final File srcFile = TempDirUtils.newFile(tempFolder, "someFile.txt");
+        final File destFile = new File(TempDirUtils.newFolder(tempFolder), 
"target");
 
         // we need to make the file non-modifiable so that the rename fails
-        Assume.assumeTrue(srcFile.getParentFile().setWritable(false, false));
-        Assume.assumeTrue(srcFile.setWritable(false, false));
+        Assumptions.assumeTrue(srcFile.getParentFile().setWritable(false, 
false));

Review Comment:
   How about using assumeThat ?



##########
flink-core/src/test/java/org/apache/flink/core/memory/OffHeapUnsafeMemorySegmentTest.java:
##########
@@ -47,23 +47,24 @@ MemorySegment createSegment(int size, Object owner) {
         return MemorySegmentFactory.allocateOffHeapUnsafeMemory(size, owner, 
() -> {});
     }
 
+    @TestTemplate
     @Override
-    @Test(expected = UnsupportedOperationException.class)
-    public void testByteBufferWrapping() {
-        createSegment(10).wrap(1, 2);
+    void testByteBufferWrapping(int pageSize) {
+        assertThatThrownBy(() -> createSegment(10).wrap(1, 2))
+                .isInstanceOf(UnsupportedOperationException.class);
     }
 
-    @Test
-    public void testCallCleanerOnFree() {
+    @TestTemplate
+    void testCallCleanerOnFree() {
         final CompletableFuture<Void> cleanerFuture = new 
CompletableFuture<>();
         MemorySegmentFactory.allocateOffHeapUnsafeMemory(
                         10, null, () -> cleanerFuture.complete(null))
                 .free();
-        assertTrue(cleanerFuture.isDone());
+        assertThat(cleanerFuture.isDone()).isTrue();

Review Comment:
   ```suggestion
           assertThat(cleanerFuture).isDone();
   ```



##########
flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterTest.java:
##########
@@ -46,21 +47,19 @@ public class HadoopRecoverableWriterTest extends 
AbstractRecoverableWriterTest {
 
     private static Path basePath;
 
-    @BeforeClass
+    @BeforeAll
     public static void testHadoopVersion() {
-        Assume.assumeTrue(HadoopUtils.isMinHadoopVersion(2, 7));
+        assumeThat(HadoopUtils.isMinHadoopVersion(2, 6)).isTrue();
     }
 
-    @BeforeClass
+    @BeforeAll
     public static void verifyOS() {
-        Assume.assumeTrue(
-                "HDFS cluster cannot be started on Windows without 
extensions.",
-                !OperatingSystem.isWindows());
+        assumeThat(OperatingSystem.isWindows()).isFalse();
     }
 
-    @BeforeClass
+    @BeforeAll
     public static void createHDFS() throws Exception {

Review Comment:
   ```suggestion
      static void createHDFS() throws Exception {
   ```



##########
flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java:
##########
@@ -23,59 +23,52 @@
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.file.FileAlreadyExistsException;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.junit.Assert.fail;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertThrows;
 
 /** A test validating that the initialization of local output paths is 
properly synchronized. */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(LocalFileSystem.class)
 public class InitOutputPathTest {

Review Comment:
   ```suggestion
   class InitOutputPathTest {
   ```



##########
flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java:
##########
@@ -88,66 +83,66 @@ public void testLocalFilesystem() throws Exception {
          */
 
         // check that dir is not existent yet
-        assertFalse(lfs.exists(pathtotmpdir));
-        assertTrue(tempdir.mkdirs());
+        assertThat(lfs.exists(pathtotmpdir)).isFalse();
+        assertThat(tempdir.mkdirs()).isTrue();
 
         // check that local file system recognizes file..
-        assertTrue(lfs.exists(pathtotmpdir));
+        assertThat(lfs.exists(pathtotmpdir)).isTrue();
         final FileStatus localstatus1 = lfs.getFileStatus(pathtotmpdir);
 
         // check that lfs recognizes directory..
-        assertTrue(localstatus1.isDir());
+        assertThat(localstatus1.isDir()).isTrue();
 
         // get status for files in this (empty) directory..
         final FileStatus[] statusforfiles = lfs.listStatus(pathtotmpdir);
 
         // no files in there.. hence, must be zero
-        assertTrue(statusforfiles.length == 0);
+        assertThat(statusforfiles).isEmpty();
 
         // check that lfs can delete directory..
         lfs.delete(pathtotmpdir, true);
 
         // double check that directory is not existent anymore..
-        assertFalse(lfs.exists(pathtotmpdir));
-        assertFalse(tempdir.exists());
+        assertThat(lfs.exists(pathtotmpdir)).isFalse();
+        assertThat(tempdir.exists()).isFalse();

Review Comment:
   ```suggestion
           assertThat(tempdir)).doesNotExist();
   ```



##########
flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java:
##########
@@ -20,483 +20,420 @@
 
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
 import java.util.Calendar;
 import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.junit.Assert.*;
+import static org.assertj.core.api.Assertions.assertThat;
 
-public class LocatableSplitAssignerTest {
+class LocatableSplitAssignerTest {
 
     @Test
-    public void testSerialSplitAssignmentWithNullHost() {
-        try {
-            final int NUM_SPLITS = 50;
-            final String[][] hosts =
-                    new String[][] {new String[] {"localhost"}, new String[0], 
null};
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 3]));
-            }
+    void testSerialSplitAssignmentWithNullHost() {
+        final int NUM_SPLITS = 50;
+        final String[][] hosts = new String[][] {new String[] {"localhost"}, 
new String[0], null};
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit(null, 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % 3]));
+        }
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("", 0));
-            assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(0, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit(null, 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("", 0)).isNull();
+        assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_SPLITS);
+        assertThat(ia.getNumberOfLocalAssignments()).isZero();
     }
 
     @Test
-    public void testSerialSplitAssignmentAllForSameHost() {
-        try {
-            final int NUM_SPLITS = 50;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, "testhost"));
-            }
+    void testSerialSplitAssignmentAllForSameHost() {
+        final int NUM_SPLITS = 50;
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
-
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("", 0));
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, "testhost"));
+        }
 
-            assertEquals(0, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isZero();
+        assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_SPLITS);
     }
 
     @Test
-    public void testSerialSplitAssignmentAllForRemoteHost() {
-        try {
-            final String[] hosts = {"host1", "host1", "host1", "host2", 
"host2", "host3"};
-            final int NUM_SPLITS = 10 * hosts.length;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 
hosts.length]));
-            }
+    void testSerialSplitAssignmentAllForRemoteHost() {
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
+        final String[] hosts = {"host1", "host1", "host1", "host2", "host2", 
"host3"};
+        final int NUM_SPLITS = 10 * hosts.length;
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+        }
 
-            assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(0, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_SPLITS);
+        assertThat(ia.getNumberOfLocalAssignments()).isZero();
     }
 
     @Test
-    public void testSerialSplitAssignmentSomeForRemoteHost() {
-        try {
-
-            // host1 reads all local
-            // host2 reads 10 local and 10 remote
-            // host3 reads all remote
-            final String[] hosts = {"host1", "host2", "host3"};
-            final int NUM_LOCAL_HOST1_SPLITS = 20;
-            final int NUM_LOCAL_HOST2_SPLITS = 10;
-            final int NUM_REMOTE_SPLITS = 30;
-            final int NUM_LOCAL_SPLITS = NUM_LOCAL_HOST1_SPLITS + 
NUM_LOCAL_HOST2_SPLITS;
-
-            // load local splits
-            int splitCnt = 0;
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            // host1 splits
-            for (int i = 0; i < NUM_LOCAL_HOST1_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, "host1"));
-            }
-            // host2 splits
-            for (int i = 0; i < NUM_LOCAL_HOST2_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, "host2"));
-            }
-            // load remote splits
-            for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, "remoteHost"));
-            }
+    void testSerialSplitAssignmentSomeForRemoteHost() {
+
+        // host1 reads all local
+        // host2 reads 10 local and 10 remote
+        // host3 reads all remote
+        final String[] hosts = {"host1", "host2", "host3"};
+        final int NUM_LOCAL_HOST1_SPLITS = 20;
+        final int NUM_LOCAL_HOST2_SPLITS = 10;
+        final int NUM_REMOTE_SPLITS = 30;
+        final int NUM_LOCAL_SPLITS = NUM_LOCAL_HOST1_SPLITS + 
NUM_LOCAL_HOST2_SPLITS;
+
+        // load local splits
+        int splitCnt = 0;
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        // host1 splits
+        for (int i = 0; i < NUM_LOCAL_HOST1_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, "host1"));
+        }
+        // host2 splits
+        for (int i = 0; i < NUM_LOCAL_HOST2_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, "host2"));
+        }
+        // load remote splits
+        for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, "remoteHost"));
+        }
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            int i = 0;
-            while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) 
!= null) {
-                assertTrue(splits.remove(is));
-            }
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        int i = 0;
+        while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) != 
null) {
+            assertThat(splits.remove(is)).isTrue();
+        }
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
 
-            assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        
assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_REMOTE_SPLITS);
+        
assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_LOCAL_SPLITS);
     }
 
     @Test
-    public void testSerialSplitAssignmentMultiLocalHost() {
-        try {
-
-            final String[] localHosts = {"local1", "local2", "local3"};
-            final String[] remoteHosts = {"remote1", "remote2", "remote3"};
-            final String[] requestingHosts = {"local3", "local2", "local1", 
"other"};
-
-            final int NUM_THREE_LOCAL_SPLITS = 10;
-            final int NUM_TWO_LOCAL_SPLITS = 10;
-            final int NUM_ONE_LOCAL_SPLITS = 10;
-            final int NUM_LOCAL_SPLITS = 30;
-            final int NUM_REMOTE_SPLITS = 10;
-            final int NUM_SPLITS = 40;
-
-            String[] threeLocalHosts = localHosts;
-            String[] twoLocalHosts = {localHosts[0], localHosts[1], 
remoteHosts[0]};
-            String[] oneLocalHost = {localHosts[0], remoteHosts[0], 
remoteHosts[1]};
-            String[] noLocalHost = remoteHosts;
-
-            int splitCnt = 0;
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            // add splits with three local hosts
-            for (int i = 0; i < NUM_THREE_LOCAL_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, 
threeLocalHosts));
-            }
-            // add splits with two local hosts
-            for (int i = 0; i < NUM_TWO_LOCAL_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, twoLocalHosts));
-            }
-            // add splits with two local hosts
-            for (int i = 0; i < NUM_ONE_LOCAL_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, oneLocalHost));
-            }
-            // add splits with two local hosts
-            for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, noLocalHost));
-            }
+    void testSerialSplitAssignmentMultiLocalHost() {
+
+        final String[] localHosts = {"local1", "local2", "local3"};
+        final String[] remoteHosts = {"remote1", "remote2", "remote3"};
+        final String[] requestingHosts = {"local3", "local2", "local1", 
"other"};
+
+        final int NUM_THREE_LOCAL_SPLITS = 10;
+        final int NUM_TWO_LOCAL_SPLITS = 10;
+        final int NUM_ONE_LOCAL_SPLITS = 10;
+        final int NUM_LOCAL_SPLITS = 30;
+        final int NUM_REMOTE_SPLITS = 10;
+        final int NUM_SPLITS = 40;
+
+        String[] threeLocalHosts = localHosts;
+        String[] twoLocalHosts = {localHosts[0], localHosts[1], 
remoteHosts[0]};
+        String[] oneLocalHost = {localHosts[0], remoteHosts[0], 
remoteHosts[1]};
+        String[] noLocalHost = remoteHosts;
+
+        int splitCnt = 0;
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        // add splits with three local hosts
+        for (int i = 0; i < NUM_THREE_LOCAL_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, threeLocalHosts));
+        }
+        // add splits with two local hosts
+        for (int i = 0; i < NUM_TWO_LOCAL_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, twoLocalHosts));
+        }
+        // add splits with two local hosts
+        for (int i = 0; i < NUM_ONE_LOCAL_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, oneLocalHost));
+        }
+        // add splits with two local hosts
+        for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, noLocalHost));
+        }
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            LocatableInputSplit is = null;
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                String host = requestingHosts[i % requestingHosts.length];
-                is = ia.getNextInputSplit(host, 0);
-                // check valid split
-                assertTrue(is != null);
-                // check unassigned split
-                assertTrue(splits.remove(is));
-                // check priority of split
-                if (host.equals(localHosts[0])) {
-                    assertTrue(Arrays.equals(is.getHostnames(), oneLocalHost));
-                } else if (host.equals(localHosts[1])) {
-                    assertTrue(Arrays.equals(is.getHostnames(), 
twoLocalHosts));
-                } else if (host.equals(localHosts[2])) {
-                    assertTrue(Arrays.equals(is.getHostnames(), 
threeLocalHosts));
-                } else {
-                    assertTrue(Arrays.equals(is.getHostnames(), noLocalHost));
-                }
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        LocatableInputSplit is = null;
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            String host = requestingHosts[i % requestingHosts.length];
+            is = ia.getNextInputSplit(host, 0);
+            // check valid split
+            assertThat(is != null).isTrue();

Review Comment:
   ```suggestion
               assertThat(is).isNotNull();
   ```



##########
flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterTest.java:
##########
@@ -74,7 +73,7 @@ public static void createHDFS() throws Exception {
         basePath = new Path(hdfs.getUri() + "/tests");
     }
 
-    @AfterClass
+    @AfterAll
     public static void destroyHDFS() throws Exception {

Review Comment:
   ```suggestion
      static void destroyHDFS() throws Exception {
   ```



##########
flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobRecoverableWriterTest.java:
##########
@@ -75,7 +68,7 @@ public static void checkCredentialsAndSetup() throws 
IOException {
         FileSystem.initialize(conf);
     }
 
-    @AfterClass
+    @AfterAll
     public static void afterClass() {

Review Comment:
   ```suggestion
      static void afterClass() {
   ```



##########
flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterTest.java:
##########
@@ -46,21 +47,19 @@ public class HadoopRecoverableWriterTest extends 
AbstractRecoverableWriterTest {
 
     private static Path basePath;
 
-    @BeforeClass
+    @BeforeAll
     public static void testHadoopVersion() {
-        Assume.assumeTrue(HadoopUtils.isMinHadoopVersion(2, 7));
+        assumeThat(HadoopUtils.isMinHadoopVersion(2, 6)).isTrue();
     }
 
-    @BeforeClass
+    @BeforeAll
     public static void verifyOS() {

Review Comment:
   ```suggestion
      static void verifyOS() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to