ferenc-csaky commented on code in PR #24483: URL: https://github.com/apache/flink/pull/24483#discussion_r1521029537
########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTest.java: ########## @@ -21,33 +21,32 @@ import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.TestLogger; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for different {@link StreamPartitioner} implementations. */ -public abstract class StreamPartitionerTest extends TestLogger { +abstract class StreamPartitionerTest { protected final StreamPartitioner<Tuple> streamPartitioner = createPartitioner(); protected final StreamRecord<Tuple> streamRecord = new StreamRecord<>(null); protected final SerializationDelegate<StreamRecord<Tuple>> serializationDelegate = new SerializationDelegate<>(null); - abstract StreamPartitioner<Tuple> createPartitioner(); + protected abstract StreamPartitioner<Tuple> createPartitioner(); Review Comment: If the class itself is package-private (which I agree with), I think we should also change every `protected` method and field to package-private as well. And then the same is true for the child classes that ovverrides this `abstract` method. WDYT? ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java: ########## @@ -2009,19 +2005,18 @@ public long getCurrentProcessingTime() { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), timestamp)); // the garbage collection timer would wrap-around - Assert.assertTrue(window.maxTimestamp() + lateness < window.maxTimestamp()); + assertThat(window.maxTimestamp() + lateness).isLessThan(window.maxTimestamp()); // and it would prematurely fire with watermark (Long.MAX_VALUE - 1500) - Assert.assertTrue(window.maxTimestamp() + lateness < Long.MAX_VALUE - 1500); + assertThat(window.maxTimestamp() + lateness).isLessThan(Long.MAX_VALUE - 1500); // if we don't correctly prevent wrap-around in the garbage collection // timers this watermark will clean our window state for the just-added // element/window testHarness.processWatermark(new Watermark(Long.MAX_VALUE - 1500)); // this watermark is before the end timestamp of our only window - Assert.assertTrue(Long.MAX_VALUE - 1500 < window.maxTimestamp()); - Assert.assertTrue(window.maxTimestamp() < Long.MAX_VALUE); + assertThat(window.maxTimestamp()).isBetween(Long.MAX_VALUE - 1500, Long.MAX_VALUE); Review Comment: This should be `isStrictlyBetween` I think, previously the defined range was exclusive. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java: ########## @@ -151,53 +132,44 @@ public void testTimeUnits() { TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)); when(mockContext.getCurrentProcessingTime()).thenReturn(1000L); - assertThat( - assigner.assignWindows("String", Long.MIN_VALUE, mockContext), - contains(timeWindow(1000, 6000))); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext)) + .contains(new TimeWindow(1000, 6000)); when(mockContext.getCurrentProcessingTime()).thenReturn(5999L); - assertThat( - assigner.assignWindows("String", Long.MIN_VALUE, mockContext), - contains(timeWindow(1000, 6000))); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext)) + .contains(new TimeWindow(1000, 6000)); when(mockContext.getCurrentProcessingTime()).thenReturn(6000L); - assertThat( - assigner.assignWindows("String", Long.MIN_VALUE, mockContext), - contains(timeWindow(6000, 11000))); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext)) + .contains(new TimeWindow(6000, 11000)); } @Test - public void testInvalidParameters() { - try { - TumblingProcessingTimeWindows.of(Time.seconds(-1)); - fail("should fail"); - } catch (IllegalArgumentException e) { - assertThat(e.toString(), containsString("abs(offset) < size")); - } - - try { - TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(20)); - fail("should fail"); - } catch (IllegalArgumentException e) { - assertThat(e.toString(), containsString("abs(offset) < size")); - } - - try { - TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(-11)); - fail("should fail"); - } catch (IllegalArgumentException e) { - assertThat(e.toString(), containsString("abs(offset) < size")); - } + void testInvalidParameters() { + + assertThatThrownBy(() -> TumblingProcessingTimeWindows.of(Time.seconds(-1))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("TumblingProcessingTimeWindows"); Review Comment: Previously the message was `abs(offset) < size` here. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java: ########## @@ -166,12 +165,12 @@ private void testRestoreWithInterrupt(int mode) throws Exception { task.getExecutingThread().join(30000); - if (task.getExecutionState() == ExecutionState.CANCELING) { - fail("Task is stuck and not canceling"); - } + assertThat(task.getExecutionState()) + .as("Task is stuck and not canceling") + .isEqualTo(ExecutionState.CANCELED); Review Comment: I think the equivalent here would be: ```java assertThat(task.getExecutionState()) .as("Task is stuck and not canceling") .isNotEqualTo(ExecutionState.CANCELING); ``` But I am not sure if it makes sense to keep this assert at all. Without knowing much context about what happens in the background, the next assert makes sure the state is `CANCELED`, so this check and the previous `fails` seems unnecessary to me. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java: ########## Review Comment: Maybe upgrade the depracated `org.apache.flink.streaming.api.windowing.time.Time` usage to `Duration`? This is probably arguable as it is orthogonal to the JUnit upgrade itself, but if I remember correctly one of the prev. JUnit5 migration PRs had such changes already. Since we make extensive changes anyways, I think it can be justified to aggregate these changes. WDYT? ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java: ########## @@ -657,64 +650,63 @@ public TaskMetricGroup getMetricGroup() { (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK); - Assert.assertEquals( - "A metric was registered multiple times.", - 7, - new HashSet<>( + assertThat( + new HashSet<>( Review Comment: nit: Here, the only reason for collecting the retrieved `Gauge` instances to a `Set` is to validate that none of them are `null`. I think having separate non-null assertions for the `Gauge` objects would be less complex and more readable compared to the current logic, but this might be personal preference so if you feel like leaving it as is, do so. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java: ########## @@ -18,304 +18,274 @@ package org.apache.flink.streaming.runtime.operators.windowing; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.HashMap; import java.util.Random; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link KeyMap}. */ -public class KeyMapTest { +class KeyMapTest { @Test - public void testInitialSizeComputation() { + void testInitialSizeComputation() { + KeyMap<String, String> map; + + map = new KeyMap<>(); + assertThat(map.getCurrentTableCapacity()).isEqualTo(64); + assertThat(map.getLog2TableCapacity()).isEqualTo(6); + assertThat(map.getShift()).isEqualTo(24); + assertThat(map.getRehashThreshold()).isEqualTo(48); + + map = new KeyMap<>(0); + assertThat(map.getCurrentTableCapacity()).isEqualTo(64); + assertThat(map.getLog2TableCapacity()).isEqualTo(6); + assertThat(map.getShift()).isEqualTo(24); + assertThat(map.getRehashThreshold()).isEqualTo(48); + + map = new KeyMap<>(1); + assertThat(map.getCurrentTableCapacity()).isEqualTo(64); + assertThat(map.getLog2TableCapacity()).isEqualTo(6); + assertThat(map.getShift()).isEqualTo(24); + assertThat(map.getRehashThreshold()).isEqualTo(48); + + map = new KeyMap<>(9); + assertThat(map.getCurrentTableCapacity()).isEqualTo(64); + assertThat(map.getLog2TableCapacity()).isEqualTo(6); + assertThat(map.getShift()).isEqualTo(24); + assertThat(map.getRehashThreshold()).isEqualTo(48); + + map = new KeyMap<>(63); + assertThat(map.getCurrentTableCapacity()).isEqualTo(64); + assertThat(map.getLog2TableCapacity()).isEqualTo(6); + assertThat(map.getShift()).isEqualTo(24); + assertThat(map.getRehashThreshold()).isEqualTo(48); + + map = new KeyMap<>(64); + assertThat(map.getCurrentTableCapacity()).isEqualTo(128); + assertThat(map.getLog2TableCapacity()).isEqualTo(7); + assertThat(map.getShift()).isEqualTo(23); + assertThat(map.getRehashThreshold()).isEqualTo(96); + + map = new KeyMap<>(500); + assertThat(map.getCurrentTableCapacity()).isEqualTo(512); + assertThat(map.getLog2TableCapacity()).isEqualTo(9); + assertThat(map.getShift()).isEqualTo(21); + assertThat(map.getRehashThreshold()).isEqualTo(384); + + map = new KeyMap<>(127); + assertThat(map.getCurrentTableCapacity()).isEqualTo(128); + assertThat(map.getLog2TableCapacity()).isEqualTo(7); + assertThat(map.getShift()).isEqualTo(23); + assertThat(map.getRehashThreshold()).isEqualTo(96); + + // no negative number of elements + assertThatThrownBy(() -> new KeyMap<>(-1)).isInstanceOf(IllegalArgumentException.class); + + // check integer overflow try { - KeyMap<String, String> map; - - map = new KeyMap<>(); - assertEquals(64, map.getCurrentTableCapacity()); - assertEquals(6, map.getLog2TableCapacity()); - assertEquals(24, map.getShift()); - assertEquals(48, map.getRehashThreshold()); - - map = new KeyMap<>(0); - assertEquals(64, map.getCurrentTableCapacity()); - assertEquals(6, map.getLog2TableCapacity()); - assertEquals(24, map.getShift()); - assertEquals(48, map.getRehashThreshold()); - - map = new KeyMap<>(1); - assertEquals(64, map.getCurrentTableCapacity()); - assertEquals(6, map.getLog2TableCapacity()); - assertEquals(24, map.getShift()); - assertEquals(48, map.getRehashThreshold()); - - map = new KeyMap<>(9); - assertEquals(64, map.getCurrentTableCapacity()); - assertEquals(6, map.getLog2TableCapacity()); - assertEquals(24, map.getShift()); - assertEquals(48, map.getRehashThreshold()); - - map = new KeyMap<>(63); - assertEquals(64, map.getCurrentTableCapacity()); - assertEquals(6, map.getLog2TableCapacity()); - assertEquals(24, map.getShift()); - assertEquals(48, map.getRehashThreshold()); - - map = new KeyMap<>(64); - assertEquals(128, map.getCurrentTableCapacity()); - assertEquals(7, map.getLog2TableCapacity()); - assertEquals(23, map.getShift()); - assertEquals(96, map.getRehashThreshold()); - - map = new KeyMap<>(500); - assertEquals(512, map.getCurrentTableCapacity()); - assertEquals(9, map.getLog2TableCapacity()); - assertEquals(21, map.getShift()); - assertEquals(384, map.getRehashThreshold()); - - map = new KeyMap<>(127); - assertEquals(128, map.getCurrentTableCapacity()); - assertEquals(7, map.getLog2TableCapacity()); - assertEquals(23, map.getShift()); - assertEquals(96, map.getRehashThreshold()); - - // no negative number of elements - try { - new KeyMap<>(-1); - fail("should fail with an exception"); - } catch (IllegalArgumentException e) { - // expected - } - - // check integer overflow - try { - map = new KeyMap<>(0x65715522); - - final int maxCap = Integer.highestOneBit(Integer.MAX_VALUE); - assertEquals( - Integer.highestOneBit(Integer.MAX_VALUE), map.getCurrentTableCapacity()); - assertEquals(30, map.getLog2TableCapacity()); - assertEquals(0, map.getShift()); - assertEquals(maxCap / 4 * 3, map.getRehashThreshold()); - } catch (OutOfMemoryError e) { - // this may indeed happen in small test setups. we tolerate this in this test - } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + map = new KeyMap<>(0x65715522); + + final int maxCap = Integer.highestOneBit(Integer.MAX_VALUE); + assertThat(map.getCurrentTableCapacity()).isEqualTo(maxCap); + assertThat(map.getLog2TableCapacity()).isEqualTo(30); + assertThat(map.getShift()).isZero(); + assertThat(map.getRehashThreshold()).isEqualTo(maxCap / 4 * 3); + } catch (OutOfMemoryError e) { + // this may indeed happen in small test setups. we tolerate this in this test } } @Test - public void testPutAndGetRandom() { - try { - final KeyMap<Integer, Integer> map = new KeyMap<>(); - final Random rnd = new Random(); + void testPutAndGetRandom() { + final KeyMap<Integer, Integer> map = new KeyMap<>(); + final Random rnd = new Random(); - final long seed = rnd.nextLong(); - final int numElements = 10000; + final long seed = rnd.nextLong(); + final int numElements = 10000; - final HashMap<Integer, Integer> groundTruth = new HashMap<>(); + final HashMap<Integer, Integer> groundTruth = new HashMap<>(); - rnd.setSeed(seed); - for (int i = 0; i < numElements; i++) { - Integer key = rnd.nextInt(); - Integer value = rnd.nextInt(); + rnd.setSeed(seed); + for (int i = 0; i < numElements; i++) { + Integer key = rnd.nextInt(); + Integer value = rnd.nextInt(); - if (rnd.nextBoolean()) { - groundTruth.put(key, value); - map.put(key, value); - } + if (rnd.nextBoolean()) { + groundTruth.put(key, value); + map.put(key, value); } + } - rnd.setSeed(seed); - for (int i = 0; i < numElements; i++) { - Integer key = rnd.nextInt(); - - // skip these, evaluating it is tricky due to duplicates - rnd.nextInt(); - rnd.nextBoolean(); - - Integer expected = groundTruth.get(key); - if (expected == null) { - assertNull(map.get(key)); - } else { - Integer contained = map.get(key); - assertNotNull(contained); - assertEquals(expected, contained); - } + rnd.setSeed(seed); + for (int i = 0; i < numElements; i++) { + Integer key = rnd.nextInt(); + + // skip these, evaluating it is tricky due to duplicates + rnd.nextInt(); + rnd.nextBoolean(); + + Integer expected = groundTruth.get(key); + if (expected == null) { + assertThat(map.get(key)).isNull(); + } else { + Integer contained = map.get(key); + assertThat(contained).isNotNull().isEqualTo(expected); } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); } } @Test - public void testConjunctTraversal() { - try { - final Random rootRnd = new Random(654685486325439L); - - final int numMaps = 7; - final int numKeys = 1000000; + void testConjunctTraversal() throws Exception { + final Random rootRnd = new Random(654685486325439L); + + final int numMaps = 7; + final int numKeys = 1000000; + + // ------ create a set of maps ------ + @SuppressWarnings("unchecked") + final KeyMap<Integer, Integer>[] maps = + (KeyMap<Integer, Integer>[]) new KeyMap<?, ?>[numMaps]; + for (int i = 0; i < numMaps; i++) { + maps[i] = new KeyMap<>(); + } - // ------ create a set of maps ------ - @SuppressWarnings("unchecked") - final KeyMap<Integer, Integer>[] maps = - (KeyMap<Integer, Integer>[]) new KeyMap<?, ?>[numMaps]; - for (int i = 0; i < numMaps; i++) { - maps[i] = new KeyMap<>(); + // ------ prepare probabilities for maps ------ + final double[] probabilities = new double[numMaps]; + final double[] probabilitiesTemp = new double[numMaps]; + { + probabilities[0] = 0.5; + double remainingProb = 1.0 - probabilities[0]; + for (int i = 1; i < numMaps - 1; i++) { + remainingProb /= 2; + probabilities[i] = remainingProb; } - // ------ prepare probabilities for maps ------ - final double[] probabilities = new double[numMaps]; - final double[] probabilitiesTemp = new double[numMaps]; - { - probabilities[0] = 0.5; - double remainingProb = 1.0 - probabilities[0]; - for (int i = 1; i < numMaps - 1; i++) { - remainingProb /= 2; - probabilities[i] = remainingProb; - } + // compensate for rounding errors + probabilities[numMaps - 1] = remainingProb; + } - // compensate for rounding errors - probabilities[numMaps - 1] = remainingProb; - } + // ------ generate random elements ------ + final long probSeed = rootRnd.nextLong(); + final long keySeed = rootRnd.nextLong(); - // ------ generate random elements ------ - final long probSeed = rootRnd.nextLong(); - final long keySeed = rootRnd.nextLong(); + final Random probRnd = new Random(probSeed); + final Random keyRnd = new Random(keySeed); - final Random probRnd = new Random(probSeed); - final Random keyRnd = new Random(keySeed); + final int maxStride = Integer.MAX_VALUE / numKeys; - final int maxStride = Integer.MAX_VALUE / numKeys; + int totalNumElements = 0; + int nextKeyValue = 1; - int totalNumElements = 0; - int nextKeyValue = 1; + for (int i = 0; i < numKeys; i++) { + int numCopies = (nextKeyValue % 3) + 1; + System.arraycopy(probabilities, 0, probabilitiesTemp, 0, numMaps); - for (int i = 0; i < numKeys; i++) { - int numCopies = (nextKeyValue % 3) + 1; - System.arraycopy(probabilities, 0, probabilitiesTemp, 0, numMaps); + double totalProb = 1.0; + for (int copy = 0; copy < numCopies; copy++) { + int pos = drawPosProportionally(probabilitiesTemp, totalProb, probRnd); + totalProb -= probabilitiesTemp[pos]; + probabilitiesTemp[pos] = 0.0; - double totalProb = 1.0; - for (int copy = 0; copy < numCopies; copy++) { - int pos = drawPosProportionally(probabilitiesTemp, totalProb, probRnd); - totalProb -= probabilitiesTemp[pos]; - probabilitiesTemp[pos] = 0.0; + Integer boxed = nextKeyValue; + Integer previous = maps[pos].put(boxed, boxed); + assertThat(previous).as("Test problem - test does not assign unique maps").isNull(); + } - Integer boxed = nextKeyValue; - Integer previous = maps[pos].put(boxed, boxed); - assertNull("Test problem - test does not assign unique maps", previous); - } + totalNumElements += numCopies; + nextKeyValue += keyRnd.nextInt(maxStride) + 1; + } - totalNumElements += numCopies; - nextKeyValue += keyRnd.nextInt(maxStride) + 1; + // check that all maps contain the total number of elements + int numContained = 0; + for (KeyMap<?, ?> map : maps) { + numContained += map.size(); + } + assertThat(numContained).isEqualTo(totalNumElements); + + // ------ check that all elements can be found in the maps ------ + keyRnd.setSeed(keySeed); + + numContained = 0; + nextKeyValue = 1; + for (int i = 0; i < numKeys; i++) { + int numCopiesExpected = (nextKeyValue % 3) + 1; + int numCopiesContained = 0; + + for (KeyMap<Integer, Integer> map : maps) { + Integer val = map.get(nextKeyValue); + if (val != null) { + assertThat(val).isEqualTo(nextKeyValue); + numCopiesContained++; + } } - // check that all maps contain the total number of elements - int numContained = 0; - for (KeyMap<?, ?> map : maps) { - numContained += map.size(); - } - assertEquals(totalNumElements, numContained); - - // ------ check that all elements can be found in the maps ------ - keyRnd.setSeed(keySeed); - - numContained = 0; - nextKeyValue = 1; - for (int i = 0; i < numKeys; i++) { - int numCopiesExpected = (nextKeyValue % 3) + 1; - int numCopiesContained = 0; - - for (KeyMap<Integer, Integer> map : maps) { - Integer val = map.get(nextKeyValue); - if (val != null) { - assertEquals(nextKeyValue, val.intValue()); - numCopiesContained++; + assertThat(numCopiesContained).isEqualTo(numCopiesExpected); + numContained += numCopiesContained; + + nextKeyValue += keyRnd.nextInt(maxStride) + 1; + } + assertThat(numContained).isEqualTo(totalNumElements); + + // ------ make a traversal over all keys and validate the keys in the traversal ------ + final int[] keysStartedAndFinished = {0, 0}; + KeyMap.TraversalEvaluator<Integer, Integer> traversal = + new KeyMap.TraversalEvaluator<Integer, Integer>() { + + private int key; + private int valueCount; + + @Override + public void startNewKey(Integer key) { + this.key = key; + this.valueCount = 0; + + keysStartedAndFinished[0]++; } - } - assertEquals(numCopiesExpected, numCopiesContained); - numContained += numCopiesContained; + @Override + public void nextValue(Integer value) { + assertThat(value).isEqualTo(this.key); + this.valueCount++; + } - nextKeyValue += keyRnd.nextInt(maxStride) + 1; - } - assertEquals(totalNumElements, numContained); - - // ------ make a traversal over all keys and validate the keys in the traversal ------ - final int[] keysStartedAndFinished = {0, 0}; - KeyMap.TraversalEvaluator<Integer, Integer> traversal = - new KeyMap.TraversalEvaluator<Integer, Integer>() { - - private int key; - private int valueCount; - - @Override - public void startNewKey(Integer key) { - this.key = key; - this.valueCount = 0; - - keysStartedAndFinished[0]++; - } - - @Override - public void nextValue(Integer value) { - assertEquals(this.key, value.intValue()); - this.valueCount++; - } - - @Override - public void keyDone() { - int expected = (key % 3) + 1; - if (expected != valueCount) { - fail( + @Override + public void keyDone() { + int expected = (key % 3) + 1; + assertThat(valueCount) + .as( "Wrong count for key " + key + " ; expected=" + expected + " , count=" - + valueCount); - } + + valueCount) + .isEqualTo(expected); - keysStartedAndFinished[1]++; - } - }; + keysStartedAndFinished[1]++; + } + }; - KeyMap.traverseMaps(shuffleArray(maps, rootRnd), traversal, 17); + KeyMap.traverseMaps(shuffleArray(maps, rootRnd), traversal, 17); - assertEquals(numKeys, keysStartedAndFinished[0]); - assertEquals(numKeys, keysStartedAndFinished[1]); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + assertThat(keysStartedAndFinished[0]).isEqualTo(numKeys); + assertThat(keysStartedAndFinished[1]).isEqualTo(numKeys); } @Test - public void testSizeComparator() { - try { - KeyMap<String, String> map1 = new KeyMap<>(5); - KeyMap<String, String> map2 = new KeyMap<>(80); - - assertTrue(map1.getCurrentTableCapacity() < map2.getCurrentTableCapacity()); - - assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map1) == 0); - assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map2) == 0); - assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map2) > 0); - assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map1) < 0); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + void testSizeComparator() { + KeyMap<String, String> map1 = new KeyMap<>(5); + KeyMap<String, String> map2 = new KeyMap<>(80); + + assertThat(map1.getCurrentTableCapacity()).isLessThan(map2.getCurrentTableCapacity()); + + assertThat(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map1) == 0).isTrue(); + assertThat(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map2) == 0).isTrue(); + assertThat(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map2) > 0).isTrue(); + assertThat(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map1) < 0).isTrue(); Review Comment: nit: Maybe use `isZero()`, `isPositive()`, and `isNegative()` instead of boolean checks. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java: ########## @@ -895,39 +890,38 @@ public TaskMetricGroup getMetricGroup() { (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK); - Assert.assertEquals( - "A metric was registered multiple times.", - 5, - new HashSet<>( + assertThat( Review Comment: nit: Here, the only reason for collecting the retrieved `Gauge` instances to a `Set` is to validate that none of them are `null`. I think having 5 non-null assertions for the `Gauge` objects would be less complex and more readable compared to the current logic, but this might be personal preference so if you feel like leaving it as is, do so. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java: ########## @@ -177,4 +178,26 @@ public String map2(String value) throws Exception { return value; } } + + /** A validation matcher for checkpoint exception against failure reason. */ + private static class CheckpointExceptionMatcher extends BaseMatcher<CheckpointException> { + + private final CheckpointFailureReason failureReason; + + public CheckpointExceptionMatcher(CheckpointFailureReason failureReason) { Review Comment: Ctor could be `private` too. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java: ########## @@ -113,21 +113,17 @@ public void testSpecialRuleReading() throws Exception { } @Test - public void testReadFinishedInput() throws Exception { - try { - testBase( - new TestReadFinishedInputStreamOperator(), - false, - new ConcurrentLinkedQueue<>(), - true); - fail("should throw an IOException"); - } catch (Exception t) { - if (!ExceptionUtils.findThrowableWithMessage( - t, "all selected inputs are already finished") - .isPresent()) { - throw t; - } - } + void testReadFinishedInput() { + assertThatThrownBy( + () -> + testBase( + new TestReadFinishedInputStreamOperator(), + false, + new ConcurrentLinkedQueue<>(), + true)) + .hasCauseInstanceOf(IOException.class) + .rootCause() + .hasMessageContaining("all selected inputs are already finished"); Review Comment: The refactored assert is more strict here, as the prev code did not validate the cause instance and succeed if any ex contains the given message on the ex stack. Personally, I am okay with this change, just wanted to highlight it. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java: ########## @@ -1206,18 +1201,15 @@ public void initializeState(StateInitializationContext context) throws Exception context.getOperatorStateStore().getListState(TEST_DESCRIPTOR); if (numberSnapshotCalls == 0) { - for (Integer v : partitionableState.get()) { - fail(); - } + assertThat(partitionableState.get()).isEmpty(); } else { Set<Integer> result = new HashSet<>(); for (Integer v : partitionableState.get()) { result.add(v); } - assertEquals(2, result.size()); - assertTrue(result.contains(42)); - assertTrue(result.contains(4711)); + assertThat(result).hasSize(2); + assertThat(result).contains(42, 4711); Review Comment: These 2 could be merged into `assertThat(result).containsExactlyInAnyOrder(42, 4711);`. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java: ########## @@ -227,97 +220,92 @@ public void testLateMerging() throws Exception { // add several non-overlapping initial windows mergeFunction.reset(); - assertEquals( - new TimeWindow(0, 3), windowSet.addWindow(new TimeWindow(0, 3), mergeFunction)); - assertFalse(mergeFunction.hasMerged()); - assertEquals(new TimeWindow(0, 3), windowSet.getStateWindow(new TimeWindow(0, 3))); + assertThat(windowSet.addWindow(new TimeWindow(0, 3), mergeFunction)) + .isEqualTo(new TimeWindow(0, 3)); + assertThat(mergeFunction.hasMerged()).isFalse(); + assertThat(windowSet.getStateWindow(new TimeWindow(0, 3))).isEqualTo(new TimeWindow(0, 3)); mergeFunction.reset(); - assertEquals( - new TimeWindow(5, 8), windowSet.addWindow(new TimeWindow(5, 8), mergeFunction)); - assertFalse(mergeFunction.hasMerged()); - assertEquals(new TimeWindow(5, 8), windowSet.getStateWindow(new TimeWindow(5, 8))); + assertThat(windowSet.addWindow(new TimeWindow(5, 8), mergeFunction)) + .isEqualTo(new TimeWindow(5, 8)); + assertThat(mergeFunction.hasMerged()).isFalse(); + assertThat(windowSet.getStateWindow(new TimeWindow(5, 8))).isEqualTo(new TimeWindow(5, 8)); mergeFunction.reset(); - assertEquals( - new TimeWindow(10, 13), windowSet.addWindow(new TimeWindow(10, 13), mergeFunction)); - assertFalse(mergeFunction.hasMerged()); - assertEquals(new TimeWindow(10, 13), windowSet.getStateWindow(new TimeWindow(10, 13))); + assertThat(windowSet.addWindow(new TimeWindow(10, 13), mergeFunction)) + .isEqualTo(new TimeWindow(10, 13)); + assertThat(mergeFunction.hasMerged()).isFalse(); + assertThat(windowSet.getStateWindow(new TimeWindow(10, 13))) + .isEqualTo(new TimeWindow(10, 13)); // add a window that merges the later two windows mergeFunction.reset(); - assertEquals( - new TimeWindow(5, 13), windowSet.addWindow(new TimeWindow(8, 10), mergeFunction)); - assertTrue(mergeFunction.hasMerged()); - assertEquals(new TimeWindow(5, 13), mergeFunction.mergeTarget()); - assertThat( - mergeFunction.stateWindow(), - anyOf(is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13)))); - assertThat( - mergeFunction.mergeSources(), - containsInAnyOrder(new TimeWindow(5, 8), new TimeWindow(10, 13))); - assertThat( - mergeFunction.mergedStateWindows(), - anyOf( - containsInAnyOrder(new TimeWindow(10, 13)), - containsInAnyOrder(new TimeWindow(5, 8)))); - assertThat(mergeFunction.mergedStateWindows(), not(hasItem(mergeFunction.mergeTarget()))); + assertThat(windowSet.addWindow(new TimeWindow(8, 10), mergeFunction)) + .isEqualTo(new TimeWindow(5, 13)); + assertThat(mergeFunction.hasMerged()).isTrue(); + assertThat(mergeFunction.mergeTarget()).isEqualTo(new TimeWindow(5, 13)); + assertThat(mergeFunction.stateWindow()) + .satisfiesAnyOf( + w -> assertThat(w).isEqualTo(new TimeWindow(5, 8)), + w -> assertThat(w).isEqualTo(new TimeWindow(10, 13))); + assertThat(mergeFunction.mergeSources()) + .containsExactlyInAnyOrder(new TimeWindow(5, 8), new TimeWindow(10, 13)); + assertThat(mergeFunction.mergedStateWindows()) + .containsAnyOf(new TimeWindow(5, 8), new TimeWindow(10, 13)); - assertEquals(new TimeWindow(0, 3), windowSet.getStateWindow(new TimeWindow(0, 3))); + assertThat(mergeFunction.mergedStateWindows().toArray()) + .satisfiesAnyOf( + o -> assertThat(o).containsExactly(new TimeWindow(10, 13)), + o -> assertThat(o).containsExactly(new TimeWindow(5, 8))); + + assertThat(mergeFunction.mergedStateWindows()).doesNotContain(mergeFunction.mergeTarget()); + + assertThat(windowSet.getStateWindow(new TimeWindow(0, 3))).isEqualTo(new TimeWindow(0, 3)); mergeFunction.reset(); - assertEquals( - new TimeWindow(5, 13), windowSet.addWindow(new TimeWindow(5, 8), mergeFunction)); - assertFalse(mergeFunction.hasMerged()); + assertThat(windowSet.addWindow(new TimeWindow(5, 8), mergeFunction)) + .isEqualTo(new TimeWindow(5, 13)); + assertThat(mergeFunction.hasMerged()).isFalse(); mergeFunction.reset(); - assertEquals( - new TimeWindow(5, 13), windowSet.addWindow(new TimeWindow(8, 10), mergeFunction)); - assertFalse(mergeFunction.hasMerged()); + assertThat(windowSet.addWindow(new TimeWindow(8, 10), mergeFunction)) + .isEqualTo(new TimeWindow(5, 13)); + assertThat(mergeFunction.hasMerged()).isFalse(); mergeFunction.reset(); - assertEquals( - new TimeWindow(5, 13), windowSet.addWindow(new TimeWindow(10, 13), mergeFunction)); - assertFalse(mergeFunction.hasMerged()); + assertThat(windowSet.addWindow(new TimeWindow(10, 13), mergeFunction)) + .isEqualTo(new TimeWindow(5, 13)); + assertThat(mergeFunction.hasMerged()).isFalse(); - assertThat( - windowSet.getStateWindow(new TimeWindow(5, 13)), - anyOf(is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13)))); + assertThat(windowSet.getStateWindow(new TimeWindow(5, 13))) + .isIn(new TimeWindow(5, 8), new TimeWindow(10, 13)); // add a window that merges all of them together mergeFunction.reset(); - assertEquals( - new TimeWindow(0, 13), windowSet.addWindow(new TimeWindow(3, 5), mergeFunction)); - assertTrue(mergeFunction.hasMerged()); - assertEquals(new TimeWindow(0, 13), mergeFunction.mergeTarget()); - assertThat( - mergeFunction.stateWindow(), - anyOf( - is(new TimeWindow(0, 3)), - is(new TimeWindow(5, 8)), - is(new TimeWindow(10, 13)))); - assertThat( - mergeFunction.mergeSources(), - containsInAnyOrder(new TimeWindow(0, 3), new TimeWindow(5, 13))); - assertThat( - mergeFunction.mergedStateWindows(), - anyOf( - containsInAnyOrder(new TimeWindow(0, 3)), - containsInAnyOrder(new TimeWindow(5, 8)), - containsInAnyOrder(new TimeWindow(10, 13)))); - assertThat(mergeFunction.mergedStateWindows(), not(hasItem(mergeFunction.mergeTarget()))); - - assertThat( - windowSet.getStateWindow(new TimeWindow(0, 13)), - anyOf( - is(new TimeWindow(0, 3)), - is(new TimeWindow(5, 8)), - is(new TimeWindow(10, 13)))); + assertThat(windowSet.addWindow(new TimeWindow(0, 13), mergeFunction)) Review Comment: This should be `.addWindow(new TimeWindow(3, 5), ...`. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java: ########## @@ -148,14 +142,13 @@ public void testFinish() throws Exception { prefix + ": Mail to put in mailbox when finishing operator"); } - assertArrayEquals( - "Output was not correct.", - expected.subList(2, expected.size()).toArray(), - output.toArray()); + assertThat(output.toArray()) + .as("Output was not correct.") + .isEqualTo(expected.subList(2, expected.size()).toArray()); Review Comment: nit: Maybe leaving them as `Collection`s would be more readable: ```java assertThat(output) .as("Output was not correct.") .containsExactlyElementsOf(expected.subList(2, expected.size())); ``` ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java: ########## @@ -130,21 +127,17 @@ public void testSpecialRuleReading() throws Exception { } @Test - public void testReadFinishedInput() throws Exception { - try { - testInputSelection( - new TestReadFinishedInputStreamOperatorFactory(), - true, - new ArrayDeque<>(), - true); - fail("should throw an IOException"); - } catch (Exception t) { - if (!ExceptionUtils.findThrowableWithMessage( - t, "Can not make a progress: all selected inputs are already finished") - .isPresent()) { - throw t; - } - } + void testReadFinishedInput() throws Exception { + assertThatThrownBy( + () -> + testInputSelection( + new TestReadFinishedInputStreamOperatorFactory(), + true, + new ArrayDeque<>(), + true)) + .isInstanceOf(IOException.class) + .hasMessageContaining( + "Can not make a progress: all selected inputs are already finished"); Review Comment: The refactored assert is more strict here, as the prev code did not validate the ex instance and succeed if any ex contains the given message on the ex stack. Personally, I am okay with this change, just wanted to highlight it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org