azagrebin commented on a change in pull request #11109:
URL: https://github.com/apache/flink/pull/11109#discussion_r413834138



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.util.JavaGcCleanerWrapper;
+
+import javax.annotation.Nonnegative;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Tracker of memory allocation and release within a custom limit.
+ *
+ * <p>This memory management in Flink uses the same approach as Java uses to 
allocate direct memory
+ * and release it upon GC but memory here can be also released directly with 
{@link #releaseMemory(long)}.
+ * The memory reservation {@link #reserveMemory(long)} tries firstly to run 
all phantom cleaners, created with
+ * {@link 
org.apache.flink.core.memory.MemoryUtils#createMemoryGcCleaner(Object, long, 
Runnable)},
+ * for objects which are ready to be garbage collected. If memory is still not 
available, it triggers GC and
+ * continues to process any ready cleaners making {@link #MAX_SLEEPS} attempts 
before throwing {@link OutOfMemoryError}.
+ */
+class UnsafeMemoryBudget {
+       // max. number of sleeps during try-reserving with exponentially
+       // increasing delay before throwing OutOfMemoryError:
+       // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+       // which means that OOME will be thrown after 0.5 s of trying
+       private static final int MAX_SLEEPS = 10;
+       private static final int RETRIGGER_GC_AFTER_SLEEPS = 9; // ~ 0.5 sec
+
+       private final long totalMemorySize;
+
+       private final AtomicLong availableMemorySize;
+
+       UnsafeMemoryBudget(long totalMemorySize) {
+               this.totalMemorySize = totalMemorySize;
+               this.availableMemorySize = new AtomicLong(totalMemorySize);
+       }
+
+       long getTotalMemorySize() {
+               return totalMemorySize;
+       }
+
+       long getAvailableMemorySize() {
+               return availableMemorySize.get();
+       }
+
+       boolean verifyEmpty() {
+               try {
+                       reserveMemory(totalMemorySize);
+               } catch (MemoryReservationException e) {
+                       return false;
+               }
+               releaseMemory(totalMemorySize);
+               return availableMemorySize.get() == totalMemorySize;
+       }
+
+       /**
+        * Reserve memory of certain size if it is available.
+        *
+        * <p>Adjusted version of {@link java.nio.Bits#reserveMemory(long, 
int)} taken from Java 11.
+        */
+       @SuppressWarnings({"OverlyComplexMethod", "JavadocReference", 
"NestedTryStatement"})
+       void reserveMemory(long size) throws MemoryReservationException {
+               long availableOrReserved = tryReserveMemory(size);
+               // optimist!
+               if (availableOrReserved >= size) {
+                       return;
+               }
+
+               boolean interrupted = false;
+               try {
+
+                       // Retry allocation until success or there are no more
+                       // references (including Cleaners that might free direct
+                       // buffer memory) to process and allocation still fails.
+                       boolean refprocActive;
+                       do {
+                               try {
+                                       refprocActive = 
JavaGcCleanerWrapper.waitForGcToRunReadyCleaners();
+                               } catch (InterruptedException e) {
+                                       // Defer interrupts and keep trying.
+                                       interrupted = true;
+                                       refprocActive = true;
+                               }
+                               availableOrReserved = tryReserveMemory(size);
+                               if (availableOrReserved >= size) {
+                                       return;
+                               }
+                       } while (refprocActive);
+
+                       // trigger VM's Reference processing
+                       //noinspection CallToSystemGC
+                       System.gc();
+
+                       // A retry loop with exponential back-off delays.
+                       // Sometimes it would suffice to give up once reference
+                       // processing is complete.  But if there are many 
threads
+                       // competing for memory, this gives more opportunities 
for
+                       // any given thread to make progress.  In particular, 
this
+                       // seems to be enough for a stress test like
+                       // DirectBufferAllocTest to (usually) succeed, while
+                       // without it that test likely fails.  Since failure 
here
+                       // ends in OOME, there's no need to hurry.

Review comment:
       I forgot to change it to `MemoryReservationException`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to