carp84 commented on a change in pull request #13688: URL: https://github.com/apache/flink/pull/13688#discussion_r511004271
########## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java ########## @@ -156,9 +160,49 @@ public static ColumnFamilyDescriptor createColumnFamilyDescriptor( Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), "The chosen state name 'default' collides with the name of the default column family!"); + if (writeBufferManagerCapacity != null) { + // It'd be great to perform the check earlier, e.g. when creating write buffer manager. + // Unfortunately the check needs write buffer size that was just calculated. + sanityCheckArenaBlockSize(options.writeBufferSize(), options.arenaBlockSize(), writeBufferManagerCapacity); + } + return new ColumnFamilyDescriptor(nameBytes, options); } + /** + * Logs a warning ff the arena block size is too high causing RocksDB to flush constantly. + * Essentially, the condition here + * <a href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47"/> + * will always be true. Review comment: ```suggestion * Essentially, the condition * <a href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47"> * here</a> will always be true. ``` ########## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java ########## @@ -91,10 +93,33 @@ public void testCreateSharedResourcesWithExpectedCapacity() { long totalMemorySize = 2048L; double writeBufferRatio = 0.5; double highPriPoolRatio = 0.1; - RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(totalMemorySize, writeBufferRatio, highPriPoolRatio); + RocksDBSharedResources rocksDBSharedResources = RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(totalMemorySize, writeBufferRatio, highPriPoolRatio); long expectedCacheCapacity = RocksDBMemoryControllerUtils.calculateActualCacheCapacity(totalMemorySize, writeBufferRatio); long expectedWbmCapacity = RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity(totalMemorySize, writeBufferRatio); + assertThat(actualCacheCapacity.get(), is(expectedCacheCapacity)); assertThat(actualWbmCapacity.get(), is(expectedWbmCapacity)); + assertThat(rocksDBSharedResources.getWriteBufferManagerCapacity(), is(expectedWbmCapacity)); + } + + @Test + public void testCalculateRocksDBDefaultArenaBlockSize() { + long writeBufferSize = 64 * 1024 * 1024; + long expectArenaBlockSize = writeBufferSize / 8; + assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize), is(expectArenaBlockSize)); + } + + @Test + public void testCalculateRocksDBMutableLimit() { + long bufferSize = 64 * 1024 * 1024; + long limit = bufferSize * 7 / 8; + assertThat(RocksDBMemoryControllerUtils.calculateRocksDBMutableLimit(bufferSize), is(limit)); + } + + @Test + public void testValidateArenaBlockSize() { + long arenaBlockSize = 8 * 1024 * 1024; + assertFalse(RocksDBMemoryControllerUtils.validateArenaBlockSize(arenaBlockSize, (long) (arenaBlockSize * 0.5))); + assertTrue(RocksDBMemoryControllerUtils.validateArenaBlockSize(arenaBlockSize, (long) (arenaBlockSize * 1.5))); } Review comment: I'm hesitating on adding these tests since they're testing against the implementation instead of any contract. Once the calculation formula changes, these tests will fail and need to be adjusted accordingly. ########## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java ########## @@ -78,6 +81,60 @@ public void testPathExceptionOnWindows() throws Exception { } } + @Test + public void testSanityCheckArenaBlockSize() { + long testWriteBufferSize = 56 * 1024 * 1024L; + long testDefaultArenaSize = testWriteBufferSize / 8; + long testValidArenaSize = testWriteBufferSize / 7; + long testInvalidArenaSize = testWriteBufferSize / 7 - 8L; + List<TestData> tests = Arrays.asList( + new TestData(testWriteBufferSize, 0, testInvalidArenaSize, false), + new TestData(testWriteBufferSize, testDefaultArenaSize, testInvalidArenaSize, false), + new TestData(testWriteBufferSize, 0, testValidArenaSize, true), + new TestData(testWriteBufferSize, testDefaultArenaSize, testValidArenaSize, true) + ); + + for (TestData test : tests) { + long writeBufferSize = test.getWriteBufferSize(); + long arenaBlockSizeConfigured = test.getArenaBlockSizeConfigured(); + long writeBufferManagerCapacity = test.getWriteBufferManagerCapacity(); + boolean expected = test.getExpectedResult(); + + boolean sanityCheckResult = RocksDBOperationUtils.sanityCheckArenaBlockSize(writeBufferSize, arenaBlockSizeConfigured, writeBufferManagerCapacity); + assertThat(sanityCheckResult, is(expected)); + } + } + + private static class TestData { + private final long writeBufferSize; + private final long arenaBlockSizeConfigured; + private final long writeBufferManagerCapacity; + private final boolean expectedResult; + + public TestData(long writeBufferSize, long arenaBlockSizeConfigured, long writeBufferManagerCapacity, boolean expectedResult) { + this.writeBufferSize = writeBufferSize; + this.arenaBlockSizeConfigured = arenaBlockSizeConfigured; + this.writeBufferManagerCapacity = writeBufferManagerCapacity; + this.expectedResult = expectedResult; + } + + public long getWriteBufferSize() { + return writeBufferSize; + } + + public long getArenaBlockSizeConfigured() { + return arenaBlockSizeConfigured; + } + + public long getWriteBufferManagerCapacity() { + return writeBufferManagerCapacity; + } + + public boolean getExpectedResult() { + return expectedResult; + } + } Review comment: ```suggestion public void testSanityCheckArenaBlockSize() { long testWriteBufferSize = 56 * 1024 * 1024L; long testDefaultArenaSize = RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(testWriteBufferSize); long testWriteBufferCapacityBoundary = testDefaultArenaSize * 8 / 7; assertThat("The sanity check result is incorrect with default arena block size", RocksDBOperationUtils.sanityCheckArenaBlockSize(testWriteBufferSize, 0, testWriteBufferCapacityBoundary), is(true)); assertThat("The sanity check should pass when the configured arena block size is small enough.", RocksDBOperationUtils.sanityCheckArenaBlockSize(testWriteBufferSize, testDefaultArenaSize - 1, testWriteBufferCapacityBoundary), is(true)); assertThat("The sanity check should fail when the configured arena block size is too big.", RocksDBOperationUtils.sanityCheckArenaBlockSize(testWriteBufferSize, testDefaultArenaSize + 1, testWriteBufferCapacityBoundary), is(false)); } ``` Sorry but I'm still not satisfied with this test and suggest to further simplify it and adding some hints if any of the test fails. And although this test is also implementation bounded, it checks/guards the result of multiple calculations, so I think we should keep it. We will also need to remove useless imports if the suggestion is accepted. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org