tkalkirill commented on code in PR #5289:
URL: https://github.com/apache/ignite-3/pull/5289#discussion_r1969628946


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -155,18 +157,14 @@ public class PersistentPageMemory implements PageMemory {
     /** Try again tag. */
     public static final int TRY_AGAIN_TAG = -1;
 
-    /**
-     * Threshold of the checkpoint buffer. We should start forcefully 
checkpointing its pages upon exceeding it. The value of {@code 2/3} is
-     * ported from {@code Ignite 2.x}.
-     */
-    private static final float CP_BUF_FILL_THRESHOLD = 2.0f / 3;
-
     /** Data region configuration view. */
     private final PersistentPageMemoryProfileView storageProfileView;
 
     /** Page IO registry. */
     private final PageIoRegistry ioRegistry;
 
+    private final Supplier<CheckpointProgress> checkpointProgressSupplier;

Review Comment:
   If it (the supplier) can return null, please add `Supplier<@Nullable 
CheckpointProgress>`.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -282,6 +285,14 @@ public PersistentPageMemory(
         }
 
         delayedPageReplacementTracker = new 
DelayedPageReplacementTracker(pageSize, flushDirtyPageForReplacement, LOG, 
sizes.length - 1);
+
+        //        new PagesWriteThrottle(

Review Comment:
   Are these comments necessary? Or write why it is needs.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -213,6 +210,9 @@ public class PersistentPageMemory implements PageMemory {
     /** Checkpoint page pool, {@code null} if not {@link #start() started}. */
     private volatile @Nullable PagePool checkpointPool;
 
+    /** Pages write throttle. */
+    private final PagesWriteThrottlePolicy writeThrottle;

Review Comment:
   Please add `@Nullable`.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1816,6 +1821,27 @@ public CheckpointUrgency checkpointUrgency() {
         return checkpointUrgency.get();
     }
 
+    /**
+     * Checks if region is sufficiently full.
+     *
+     * @param dirtyRatioThreshold Max allowed dirty pages ration.
+     */
+    public boolean shouldThrottle(double dirtyRatioThreshold) {
+        Segment[] segments = this.segments;
+
+        if (segments == null) {
+            return false;
+        }
+
+        for (Segment segment : segments) {

Review Comment:
   I think that iteration over the counting loop will be performed faster and 
without additional iterator allocation.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java:
##########
@@ -55,4 +55,9 @@ public interface CheckpointProgress {
      * written.
      */
     @Nullable CheckpointDirtyPages pagesToWrite();
+
+    /**
+     * Returns a number of written checkpoint pages.
+     */

Review Comment:
   ```suggestion
       /** Returns a number of written checkpoint pages. */
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1835,7 +1861,11 @@ public int maxCheckpointBufferPages() {
     }
 
     private void releaseCheckpointBufferPage(long tmpBufPtr) {
-        checkpointPool.releaseFreePage(tmpBufPtr);
+        int resultCounter = checkpointPool.releaseFreePage(tmpBufPtr);
+
+        if (writeThrottle != null && resultCounter == checkpointPool.pages() / 
2) {

Review Comment:
   Shouldn't we replace it with `>>> 1`, which is faster?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1459,13 +1478,13 @@ private long replacementSize() {
         private void acquirePage(long absPtr) {
             PageHeader.acquirePage(absPtr);
 
-            updateAtomicInt(acquiredPagesPtr, 1);
+            GridUnsafe.incrementAndGetInt(acquiredPagesPtr);

Review Comment:
   Maybe use static import?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/ExponentialBackoff.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements exponential backoff logic. Contains a counter and increments it 
on each {@link #nextDuration()}.
+ * May be reset using {@link #reset()}.
+ */
+class ExponentialBackoff {
+    /**
+     * Starting backoff duration.
+     */
+    private final long startingBackoffNanos;
+
+    /**
+     * Backoff ratio. Each next duration will be this times longer.
+     */
+    private final double backoffRatio;
+
+    /**
+     * Exponential backoff counter.
+     */
+    private final AtomicInteger exponentialBackoffCounter = new 
AtomicInteger(0);
+
+    /**
+     * Constructs a new instance with the given parameters.
+     *
+     * @param startingBackoffNanos duration of first backoff in nanoseconds
+     * @param backoffRatio         each next duration will be this times longer

Review Comment:
   ```suggestion
        * Constructs a new instance with the given parameters.
        *
        * @param startingBackoffNanos Duration of first backoff in nanoseconds.
        * @param backoffRatio Each next duration will be this times longer.
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is 
overflowed.
+ */
+public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(PagesWriteThrottle.class);
+
+    /** Page memory. */
+    private final PersistentPageMemory pageMemory;
+
+    /** Database manager. */
+    private final Supplier<CheckpointProgress> cpProgress;
+
+    /** If true, throttle will only protect from checkpoint buffer overflow, 
not from dirty pages ratio cap excess. */
+    private final boolean throttleOnlyPagesInCheckpoint;
+
+    /** Checkpoint lock state checker. */
+    private final CheckpointLockStateChecker stateChecker;
+
+    /** In-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy inCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Not-in-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy 
notInCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Checkpoint Buffer-related logic used to keep it safe. */
+    private final CheckpointBufferOverflowWatchdog cpBufferWatchdog;
+
+    /** Threads that are throttled due to checkpoint buffer overflow. */
+    private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param pageMemory Page memory.
+     * @param cpProgress Database manager.
+     * @param stateChecker checkpoint lock state checker.
+     * @param throttleOnlyPagesInCheckpoint If true, throttle will only 
protect from checkpoint buffer overflow.
+     */
+    public PagesWriteThrottle(
+            PersistentPageMemory pageMemory,
+            Supplier<CheckpointProgress> cpProgress,
+            CheckpointLockStateChecker stateChecker,
+            boolean throttleOnlyPagesInCheckpoint
+    ) {
+        this.pageMemory = pageMemory;
+        this.cpProgress = cpProgress;
+        this.stateChecker = stateChecker;
+        this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
+        cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory);
+
+        assert throttleOnlyPagesInCheckpoint || cpProgress != null
+                : "cpProgress must be not null if ratio based throttling mode 
is used";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkDirty(boolean isPageInCheckpoint) {
+        assert stateChecker.checkpointLockIsHeldByThread();
+
+        boolean shouldThrottle = false;
+
+        if (isPageInCheckpoint) {
+            shouldThrottle = isCpBufferOverflowThresholdExceeded();
+        }
+
+        if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) {
+            CheckpointProgress progress = cpProgress.get();
+
+            if (progress == null) {
+                return; // Don't throttle if checkpoint is not running.
+            }
+
+            int cpWrittenPages = progress.writtenPages();
+
+            int cpTotalPages = progress.currentCheckpointPagesCount();
+
+            if (cpWrittenPages == cpTotalPages) {
+                // Checkpoint is already in fsync stage, increasing maximum 
ratio of dirty pages to 3/4
+                shouldThrottle = pageMemory.shouldThrottle(0.75);
+            } else {
+                double dirtyRatioThreshold = ((double) cpWrittenPages) / 
cpTotalPages;
+
+                // Starting with 0.05 to avoid throttle right after checkpoint 
start
+                // 7/12 is maximum ratio of dirty pages
+                dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 
/ 12;
+
+                shouldThrottle = 
pageMemory.shouldThrottle(dirtyRatioThreshold);
+            }
+        }
+
+        ExponentialBackoffThrottlingStrategy exponentialThrottle = 
isPageInCheckpoint
+                ? inCheckpointProtection : notInCheckpointProtection;
+
+        if (shouldThrottle) {
+            long throttleParkTimeNs = exponentialThrottle.protectionParkTime();
+
+            Thread curThread = Thread.currentThread();
+
+            if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+                LOG.warn("Parking thread=" + curThread.getName()
+                        + " for timeout(ms)=" + 
TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs));
+            }
+
+            if (isPageInCheckpoint) {
+                cpBufThrottledThreads.put(curThread.getId(), curThread);
+
+                try {
+                    LockSupport.parkNanos(throttleParkTimeNs);
+                } finally {
+                    cpBufThrottledThreads.remove(curThread.getId());
+
+                    if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+                        LOG.warn("Unparking thread=" + curThread.getName()
+                                + " with park timeout(ms)=" + 
TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs));
+                    }
+                }
+            } else {
+                LockSupport.parkNanos(throttleParkTimeNs);
+            }
+        } else {
+            boolean backoffWasAlreadyStarted = 
exponentialThrottle.resetBackoff();
+
+            if (isPageInCheckpoint && backoffWasAlreadyStarted) {
+                unparkParkedThreads();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void wakeupThrottledThreads() {
+        if (!isCpBufferOverflowThresholdExceeded()) {
+            inCheckpointProtection.resetBackoff();
+
+            unparkParkedThreads();
+        }
+    }
+
+    private void unparkParkedThreads() {
+        cpBufThrottledThreads.values().forEach(LockSupport::unpark);
+    }
+
+    /** {@inheritDoc} */

Review Comment:
   ```suggestion
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is 
overflowed.
+ */
+public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(PagesWriteThrottle.class);
+
+    /** Page memory. */
+    private final PersistentPageMemory pageMemory;
+
+    /** Database manager. */
+    private final Supplier<CheckpointProgress> cpProgress;
+
+    /** If true, throttle will only protect from checkpoint buffer overflow, 
not from dirty pages ratio cap excess. */
+    private final boolean throttleOnlyPagesInCheckpoint;
+
+    /** Checkpoint lock state checker. */
+    private final CheckpointLockStateChecker stateChecker;
+
+    /** In-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy inCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Not-in-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy 
notInCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Checkpoint Buffer-related logic used to keep it safe. */
+    private final CheckpointBufferOverflowWatchdog cpBufferWatchdog;
+
+    /** Threads that are throttled due to checkpoint buffer overflow. */
+    private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param pageMemory Page memory.
+     * @param cpProgress Database manager.
+     * @param stateChecker checkpoint lock state checker.
+     * @param throttleOnlyPagesInCheckpoint If true, throttle will only 
protect from checkpoint buffer overflow.
+     */
+    public PagesWriteThrottle(
+            PersistentPageMemory pageMemory,
+            Supplier<CheckpointProgress> cpProgress,
+            CheckpointLockStateChecker stateChecker,
+            boolean throttleOnlyPagesInCheckpoint
+    ) {
+        this.pageMemory = pageMemory;
+        this.cpProgress = cpProgress;
+        this.stateChecker = stateChecker;
+        this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
+        cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory);
+
+        assert throttleOnlyPagesInCheckpoint || cpProgress != null
+                : "cpProgress must be not null if ratio based throttling mode 
is used";
+    }
+
+    /** {@inheritDoc} */

Review Comment:
   ```suggestion
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is 
overflowed.
+ */
+public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(PagesWriteThrottle.class);
+
+    /** Page memory. */
+    private final PersistentPageMemory pageMemory;
+
+    /** Database manager. */
+    private final Supplier<CheckpointProgress> cpProgress;
+
+    /** If true, throttle will only protect from checkpoint buffer overflow, 
not from dirty pages ratio cap excess. */
+    private final boolean throttleOnlyPagesInCheckpoint;
+
+    /** Checkpoint lock state checker. */
+    private final CheckpointLockStateChecker stateChecker;
+
+    /** In-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy inCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Not-in-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy 
notInCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Checkpoint Buffer-related logic used to keep it safe. */
+    private final CheckpointBufferOverflowWatchdog cpBufferWatchdog;
+
+    /** Threads that are throttled due to checkpoint buffer overflow. */
+    private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param pageMemory Page memory.
+     * @param cpProgress Database manager.
+     * @param stateChecker checkpoint lock state checker.
+     * @param throttleOnlyPagesInCheckpoint If true, throttle will only 
protect from checkpoint buffer overflow.
+     */
+    public PagesWriteThrottle(
+            PersistentPageMemory pageMemory,
+            Supplier<CheckpointProgress> cpProgress,
+            CheckpointLockStateChecker stateChecker,
+            boolean throttleOnlyPagesInCheckpoint
+    ) {
+        this.pageMemory = pageMemory;
+        this.cpProgress = cpProgress;
+        this.stateChecker = stateChecker;
+        this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
+        cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory);
+
+        assert throttleOnlyPagesInCheckpoint || cpProgress != null
+                : "cpProgress must be not null if ratio based throttling mode 
is used";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkDirty(boolean isPageInCheckpoint) {
+        assert stateChecker.checkpointLockIsHeldByThread();
+
+        boolean shouldThrottle = false;
+
+        if (isPageInCheckpoint) {
+            shouldThrottle = isCpBufferOverflowThresholdExceeded();
+        }
+
+        if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) {
+            CheckpointProgress progress = cpProgress.get();
+
+            if (progress == null) {
+                return; // Don't throttle if checkpoint is not running.
+            }
+
+            int cpWrittenPages = progress.writtenPages();
+
+            int cpTotalPages = progress.currentCheckpointPagesCount();
+
+            if (cpWrittenPages == cpTotalPages) {
+                // Checkpoint is already in fsync stage, increasing maximum 
ratio of dirty pages to 3/4
+                shouldThrottle = pageMemory.shouldThrottle(0.75);
+            } else {
+                double dirtyRatioThreshold = ((double) cpWrittenPages) / 
cpTotalPages;
+
+                // Starting with 0.05 to avoid throttle right after checkpoint 
start
+                // 7/12 is maximum ratio of dirty pages
+                dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 
/ 12;
+
+                shouldThrottle = 
pageMemory.shouldThrottle(dirtyRatioThreshold);
+            }
+        }
+
+        ExponentialBackoffThrottlingStrategy exponentialThrottle = 
isPageInCheckpoint
+                ? inCheckpointProtection : notInCheckpointProtection;
+
+        if (shouldThrottle) {
+            long throttleParkTimeNs = exponentialThrottle.protectionParkTime();
+
+            Thread curThread = Thread.currentThread();
+
+            if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+                LOG.warn("Parking thread=" + curThread.getName()
+                        + " for timeout(ms)=" + 
TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs));
+            }
+
+            if (isPageInCheckpoint) {
+                cpBufThrottledThreads.put(curThread.getId(), curThread);
+
+                try {
+                    LockSupport.parkNanos(throttleParkTimeNs);
+                } finally {
+                    cpBufThrottledThreads.remove(curThread.getId());
+
+                    if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+                        LOG.warn("Unparking thread=" + curThread.getName()
+                                + " with park timeout(ms)=" + 
TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs));
+                    }
+                }
+            } else {
+                LockSupport.parkNanos(throttleParkTimeNs);
+            }
+        } else {
+            boolean backoffWasAlreadyStarted = 
exponentialThrottle.resetBackoff();
+
+            if (isPageInCheckpoint && backoffWasAlreadyStarted) {
+                unparkParkedThreads();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */

Review Comment:
   ```suggestion
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is 
overflowed.
+ */
+public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(PagesWriteThrottle.class);
+
+    /** Page memory. */
+    private final PersistentPageMemory pageMemory;
+
+    /** Database manager. */
+    private final Supplier<CheckpointProgress> cpProgress;
+
+    /** If true, throttle will only protect from checkpoint buffer overflow, 
not from dirty pages ratio cap excess. */
+    private final boolean throttleOnlyPagesInCheckpoint;
+
+    /** Checkpoint lock state checker. */
+    private final CheckpointLockStateChecker stateChecker;
+
+    /** In-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy inCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Not-in-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy 
notInCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Checkpoint Buffer-related logic used to keep it safe. */
+    private final CheckpointBufferOverflowWatchdog cpBufferWatchdog;
+
+    /** Threads that are throttled due to checkpoint buffer overflow. */
+    private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param pageMemory Page memory.
+     * @param cpProgress Database manager.
+     * @param stateChecker checkpoint lock state checker.
+     * @param throttleOnlyPagesInCheckpoint If true, throttle will only 
protect from checkpoint buffer overflow.
+     */
+    public PagesWriteThrottle(
+            PersistentPageMemory pageMemory,
+            Supplier<CheckpointProgress> cpProgress,
+            CheckpointLockStateChecker stateChecker,
+            boolean throttleOnlyPagesInCheckpoint
+    ) {
+        this.pageMemory = pageMemory;
+        this.cpProgress = cpProgress;
+        this.stateChecker = stateChecker;
+        this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
+        cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory);
+
+        assert throttleOnlyPagesInCheckpoint || cpProgress != null
+                : "cpProgress must be not null if ratio based throttling mode 
is used";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkDirty(boolean isPageInCheckpoint) {
+        assert stateChecker.checkpointLockIsHeldByThread();
+
+        boolean shouldThrottle = false;
+
+        if (isPageInCheckpoint) {
+            shouldThrottle = isCpBufferOverflowThresholdExceeded();
+        }
+
+        if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) {
+            CheckpointProgress progress = cpProgress.get();
+
+            if (progress == null) {
+                return; // Don't throttle if checkpoint is not running.
+            }
+
+            int cpWrittenPages = progress.writtenPages();
+
+            int cpTotalPages = progress.currentCheckpointPagesCount();
+
+            if (cpWrittenPages == cpTotalPages) {
+                // Checkpoint is already in fsync stage, increasing maximum 
ratio of dirty pages to 3/4
+                shouldThrottle = pageMemory.shouldThrottle(0.75);
+            } else {
+                double dirtyRatioThreshold = ((double) cpWrittenPages) / 
cpTotalPages;
+
+                // Starting with 0.05 to avoid throttle right after checkpoint 
start
+                // 7/12 is maximum ratio of dirty pages
+                dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 
/ 12;
+
+                shouldThrottle = 
pageMemory.shouldThrottle(dirtyRatioThreshold);
+            }
+        }
+
+        ExponentialBackoffThrottlingStrategy exponentialThrottle = 
isPageInCheckpoint
+                ? inCheckpointProtection : notInCheckpointProtection;
+
+        if (shouldThrottle) {
+            long throttleParkTimeNs = exponentialThrottle.protectionParkTime();
+
+            Thread curThread = Thread.currentThread();
+
+            if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+                LOG.warn("Parking thread=" + curThread.getName()
+                        + " for timeout(ms)=" + 
TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs));
+            }
+
+            if (isPageInCheckpoint) {
+                cpBufThrottledThreads.put(curThread.getId(), curThread);
+
+                try {
+                    LockSupport.parkNanos(throttleParkTimeNs);
+                } finally {
+                    cpBufThrottledThreads.remove(curThread.getId());
+
+                    if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+                        LOG.warn("Unparking thread=" + curThread.getName()
+                                + " with park timeout(ms)=" + 
TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs));
+                    }
+                }
+            } else {
+                LockSupport.parkNanos(throttleParkTimeNs);
+            }
+        } else {
+            boolean backoffWasAlreadyStarted = 
exponentialThrottle.resetBackoff();
+
+            if (isPageInCheckpoint && backoffWasAlreadyStarted) {
+                unparkParkedThreads();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void wakeupThrottledThreads() {
+        if (!isCpBufferOverflowThresholdExceeded()) {
+            inCheckpointProtection.resetBackoff();
+
+            unparkParkedThreads();
+        }
+    }
+
+    private void unparkParkedThreads() {
+        cpBufThrottledThreads.values().forEach(LockSupport::unpark);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBeginCheckpoint() {
+    }
+
+    /** {@inheritDoc} */

Review Comment:
   ```suggestion
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is 
overflowed.
+ */
+public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(PagesWriteThrottle.class);
+
+    /** Page memory. */
+    private final PersistentPageMemory pageMemory;
+
+    /** Database manager. */
+    private final Supplier<CheckpointProgress> cpProgress;
+
+    /** If true, throttle will only protect from checkpoint buffer overflow, 
not from dirty pages ratio cap excess. */
+    private final boolean throttleOnlyPagesInCheckpoint;
+
+    /** Checkpoint lock state checker. */
+    private final CheckpointLockStateChecker stateChecker;
+
+    /** In-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy inCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Not-in-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy 
notInCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Checkpoint Buffer-related logic used to keep it safe. */
+    private final CheckpointBufferOverflowWatchdog cpBufferWatchdog;
+
+    /** Threads that are throttled due to checkpoint buffer overflow. */
+    private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param pageMemory Page memory.
+     * @param cpProgress Database manager.
+     * @param stateChecker checkpoint lock state checker.
+     * @param throttleOnlyPagesInCheckpoint If true, throttle will only 
protect from checkpoint buffer overflow.
+     */
+    public PagesWriteThrottle(
+            PersistentPageMemory pageMemory,
+            Supplier<CheckpointProgress> cpProgress,
+            CheckpointLockStateChecker stateChecker,
+            boolean throttleOnlyPagesInCheckpoint
+    ) {
+        this.pageMemory = pageMemory;
+        this.cpProgress = cpProgress;
+        this.stateChecker = stateChecker;
+        this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
+        cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory);
+
+        assert throttleOnlyPagesInCheckpoint || cpProgress != null
+                : "cpProgress must be not null if ratio based throttling mode 
is used";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkDirty(boolean isPageInCheckpoint) {
+        assert stateChecker.checkpointLockIsHeldByThread();
+
+        boolean shouldThrottle = false;
+
+        if (isPageInCheckpoint) {
+            shouldThrottle = isCpBufferOverflowThresholdExceeded();
+        }
+
+        if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) {
+            CheckpointProgress progress = cpProgress.get();
+
+            if (progress == null) {
+                return; // Don't throttle if checkpoint is not running.
+            }
+
+            int cpWrittenPages = progress.writtenPages();
+
+            int cpTotalPages = progress.currentCheckpointPagesCount();
+
+            if (cpWrittenPages == cpTotalPages) {
+                // Checkpoint is already in fsync stage, increasing maximum 
ratio of dirty pages to 3/4
+                shouldThrottle = pageMemory.shouldThrottle(0.75);
+            } else {
+                double dirtyRatioThreshold = ((double) cpWrittenPages) / 
cpTotalPages;
+
+                // Starting with 0.05 to avoid throttle right after checkpoint 
start
+                // 7/12 is maximum ratio of dirty pages

Review Comment:
   ```suggestion
                   // 7/12 is maximum ratio of dirty pages.
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is 
overflowed.
+ */
+public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(PagesWriteThrottle.class);
+
+    /** Page memory. */
+    private final PersistentPageMemory pageMemory;
+
+    /** Database manager. */
+    private final Supplier<CheckpointProgress> cpProgress;
+
+    /** If true, throttle will only protect from checkpoint buffer overflow, 
not from dirty pages ratio cap excess. */
+    private final boolean throttleOnlyPagesInCheckpoint;
+
+    /** Checkpoint lock state checker. */
+    private final CheckpointLockStateChecker stateChecker;
+
+    /** In-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy inCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Not-in-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy 
notInCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Checkpoint Buffer-related logic used to keep it safe. */
+    private final CheckpointBufferOverflowWatchdog cpBufferWatchdog;
+
+    /** Threads that are throttled due to checkpoint buffer overflow. */
+    private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param pageMemory Page memory.
+     * @param cpProgress Database manager.
+     * @param stateChecker checkpoint lock state checker.
+     * @param throttleOnlyPagesInCheckpoint If true, throttle will only 
protect from checkpoint buffer overflow.
+     */
+    public PagesWriteThrottle(
+            PersistentPageMemory pageMemory,
+            Supplier<CheckpointProgress> cpProgress,
+            CheckpointLockStateChecker stateChecker,
+            boolean throttleOnlyPagesInCheckpoint
+    ) {
+        this.pageMemory = pageMemory;
+        this.cpProgress = cpProgress;
+        this.stateChecker = stateChecker;
+        this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
+        cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory);
+
+        assert throttleOnlyPagesInCheckpoint || cpProgress != null
+                : "cpProgress must be not null if ratio based throttling mode 
is used";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkDirty(boolean isPageInCheckpoint) {
+        assert stateChecker.checkpointLockIsHeldByThread();
+
+        boolean shouldThrottle = false;
+
+        if (isPageInCheckpoint) {
+            shouldThrottle = isCpBufferOverflowThresholdExceeded();
+        }
+
+        if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) {
+            CheckpointProgress progress = cpProgress.get();
+
+            if (progress == null) {
+                return; // Don't throttle if checkpoint is not running.
+            }
+
+            int cpWrittenPages = progress.writtenPages();
+
+            int cpTotalPages = progress.currentCheckpointPagesCount();
+
+            if (cpWrittenPages == cpTotalPages) {
+                // Checkpoint is already in fsync stage, increasing maximum 
ratio of dirty pages to 3/4
+                shouldThrottle = pageMemory.shouldThrottle(0.75);
+            } else {
+                double dirtyRatioThreshold = ((double) cpWrittenPages) / 
cpTotalPages;
+
+                // Starting with 0.05 to avoid throttle right after checkpoint 
start
+                // 7/12 is maximum ratio of dirty pages
+                dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 
/ 12;
+
+                shouldThrottle = 
pageMemory.shouldThrottle(dirtyRatioThreshold);
+            }
+        }
+
+        ExponentialBackoffThrottlingStrategy exponentialThrottle = 
isPageInCheckpoint
+                ? inCheckpointProtection : notInCheckpointProtection;
+
+        if (shouldThrottle) {
+            long throttleParkTimeNs = exponentialThrottle.protectionParkTime();
+
+            Thread curThread = Thread.currentThread();
+
+            if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+                LOG.warn("Parking thread=" + curThread.getName()
+                        + " for timeout(ms)=" + 
TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs));
+            }
+
+            if (isPageInCheckpoint) {
+                cpBufThrottledThreads.put(curThread.getId(), curThread);
+
+                try {
+                    LockSupport.parkNanos(throttleParkTimeNs);
+                } finally {
+                    cpBufThrottledThreads.remove(curThread.getId());
+
+                    if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+                        LOG.warn("Unparking thread=" + curThread.getName()
+                                + " with park timeout(ms)=" + 
TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs));
+                    }
+                }
+            } else {
+                LockSupport.parkNanos(throttleParkTimeNs);
+            }
+        } else {
+            boolean backoffWasAlreadyStarted = 
exponentialThrottle.resetBackoff();
+
+            if (isPageInCheckpoint && backoffWasAlreadyStarted) {
+                unparkParkedThreads();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void wakeupThrottledThreads() {
+        if (!isCpBufferOverflowThresholdExceeded()) {
+            inCheckpointProtection.resetBackoff();
+
+            unparkParkedThreads();
+        }
+    }
+
+    private void unparkParkedThreads() {
+        cpBufThrottledThreads.values().forEach(LockSupport::unpark);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBeginCheckpoint() {
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onFinishCheckpoint() {
+        inCheckpointProtection.resetBackoff();
+        notInCheckpointProtection.resetBackoff();
+    }
+
+    /** {@inheritDoc} */

Review Comment:
   ```suggestion
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/ExponentialBackoff.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements exponential backoff logic. Contains a counter and increments it 
on each {@link #nextDuration()}.
+ * May be reset using {@link #reset()}.
+ */
+class ExponentialBackoff {
+    /**
+     * Starting backoff duration.
+     */
+    private final long startingBackoffNanos;
+
+    /**
+     * Backoff ratio. Each next duration will be this times longer.
+     */
+    private final double backoffRatio;
+
+    /**
+     * Exponential backoff counter.
+     */
+    private final AtomicInteger exponentialBackoffCounter = new 
AtomicInteger(0);
+
+    /**
+     * Constructs a new instance with the given parameters.
+     *
+     * @param startingBackoffNanos duration of first backoff in nanoseconds
+     * @param backoffRatio         each next duration will be this times longer
+     */
+    public ExponentialBackoff(long startingBackoffNanos, double backoffRatio) {
+        this.startingBackoffNanos = startingBackoffNanos;
+        this.backoffRatio = backoffRatio;
+    }
+
+    /**
+     * Returns next backoff duration (in nanoseconds). As a side effect, 
increments the backoff counter so that
+     * next call will return a longer duration.
+     *
+     * @return next backoff duration in nanoseconds
+     */
+    public long nextDuration() {
+        int exponent = exponentialBackoffCounter.getAndIncrement();
+        return (long) (startingBackoffNanos * Math.pow(backoffRatio, 
exponent));
+    }
+
+    /**
+     * Resets the exponential backoff counter so that next call to {@link 
#nextDuration()}
+     * will return {@link #startingBackoffNanos}.
+     *
+     * @return {@code true} iff this backoff was not already in a reset state

Review Comment:
   ```suggestion
        * @return {@code true} iff this backoff was not already in a reset 
state.
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/CheckpointBufferOverflowWatchdog.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import static 
org.apache.ignite.internal.pagememory.persistence.throttling.PagesWriteThrottlePolicy.CP_BUF_FILL_THRESHOLD;
+
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+
+/**
+ * Logic used to determine whether Checkpoint Buffer is in danger zone and 
writer threads should be throttled.
+ */
+class CheckpointBufferOverflowWatchdog {
+    /** Page memory. */
+    private final PersistentPageMemory pageMemory;
+
+    /**
+     * Creates a new instance.
+     *
+     * @param pageMemory page memory to use
+     */
+    CheckpointBufferOverflowWatchdog(PersistentPageMemory pageMemory) {
+        this.pageMemory = pageMemory;
+    }
+
+    /**
+     * Returns true if Checkpoint Buffer is in danger zone (more than
+     * {@link PagesWriteThrottlePolicy#CP_BUF_FILL_THRESHOLD} of the buffer is 
filled) and, hence, writer threads need
+     * to be throttled.
+     *
+     * @return {@code true} iff Checkpoint Buffer is in danger zone

Review Comment:
   ```suggestion
        * @return {@code true} iff Checkpoint Buffer is in danger zone.
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/CheckpointBufferOverflowWatchdog.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import static 
org.apache.ignite.internal.pagememory.persistence.throttling.PagesWriteThrottlePolicy.CP_BUF_FILL_THRESHOLD;
+
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+
+/**
+ * Logic used to determine whether Checkpoint Buffer is in danger zone and 
writer threads should be throttled.
+ */
+class CheckpointBufferOverflowWatchdog {
+    /** Page memory. */
+    private final PersistentPageMemory pageMemory;
+
+    /**
+     * Creates a new instance.
+     *
+     * @param pageMemory page memory to use

Review Comment:
   ```suggestion
        * @param pageMemory Page memory to use.
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is 
overflowed.
+ */
+public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(PagesWriteThrottle.class);
+
+    /** Page memory. */
+    private final PersistentPageMemory pageMemory;
+
+    /** Database manager. */
+    private final Supplier<CheckpointProgress> cpProgress;
+
+    /** If true, throttle will only protect from checkpoint buffer overflow, 
not from dirty pages ratio cap excess. */
+    private final boolean throttleOnlyPagesInCheckpoint;
+
+    /** Checkpoint lock state checker. */
+    private final CheckpointLockStateChecker stateChecker;
+
+    /** In-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy inCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Not-in-checkpoint protection logic. */
+    private final ExponentialBackoffThrottlingStrategy 
notInCheckpointProtection
+            = new ExponentialBackoffThrottlingStrategy();
+
+    /** Checkpoint Buffer-related logic used to keep it safe. */
+    private final CheckpointBufferOverflowWatchdog cpBufferWatchdog;
+
+    /** Threads that are throttled due to checkpoint buffer overflow. */
+    private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param pageMemory Page memory.
+     * @param cpProgress Database manager.
+     * @param stateChecker checkpoint lock state checker.
+     * @param throttleOnlyPagesInCheckpoint If true, throttle will only 
protect from checkpoint buffer overflow.

Review Comment:
   ```suggestion
        * @param throttleOnlyPagesInCheckpoint If {@code true}, throttle will 
only protect from checkpoint buffer overflow.
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottlePolicy.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+/**
+ * Throttling policy, encapsulates logic of delaying write operations.
+ *
+ * <p>There are two resources that get (or might get) consumed when writing:
+ * <ul>
+ *     <li>
+ *         <b>Checkpoint Buffer</b> where a page is placed when, being under 
checkpoint, it gets written
+ *     </li>
+ *     <li>
+ *         <b>Clean pages</b> which get dirtied when writes occur
+ *     </li>
+ * </ul>
+ * Both resources are limited in size. Both are freed when checkpoint 
finishes. This means that, if writers
+ * write too fast, they can consume any of these two resources before we have 
a chance to finish a checkpoint.
+ * If this happens, the cluster fails or stalls.
+ *
+ * <p>Write throttling solves this problem by slowing down the writers to a 
rate at which they do not exhaust
+ * any of the two resources.
+ *
+ * <p>An alternative to just slowing down is to wait in a loop till the 
resource we're after gets freed, and
+ * only then allow the write to happen. The problem with this approach is that 
we cannot wait in a loop/sleep
+ * under a write lock, so the logic would be a lot more complicated. Maybe in 
the future we'll follow this path,
+ * but for now, a simpler approach of just throttling is used (see below).
+ *
+ * <p>If we just slow writers down by throttling their writes, AND we have 
enough Checkpoint Buffer and pages in
+ * segments to take some load bursts, we are fine. Under such assumptions, it 
does not matter whether we throttle
+ * a writer thread before acquiring write lock or after it gets released; in 
the current implementation, this
+ * happens after write lock gets released (because it was considered simpler 
to implement).
+ *
+ * <p>The actual throttling happens when a page gets marked dirty by calling 
{@link #onMarkDirty(boolean)}.
+ *
+ * <p>There are two additional methods for interfacing with other parts of the 
system:
+ * <ul>
+ *     <li>{@link #wakeupThrottledThreads()} which wakes up the threads 
currently being throttled; in the current
+ *     implementation, it is called  when Checkpoint Buffer utilization falls 
below 1/2.</li>
+ *     <li>{@link #isCpBufferOverflowThresholdExceeded()} which is called by a 
checkpointer to see whether the Checkpoint Buffer is
+ *     in a danger zone and, if yes, it starts to prioritize writing pages 
from the Checkpoint Buffer over
+ *     pages from the normal checkpoint sequence.</li>
+ * </ul>
+ */
+public interface PagesWriteThrottlePolicy {
+    // TODO Maybe make it configurable in IGNITE-24548
+    /** Min park time which triggers logging. */
+    long LOGGING_THRESHOLD = 10;
+
+    /** Checkpoint buffer fullfill upper bound. */
+    float CP_BUF_FILL_THRESHOLD = 2f / 3;
+
+    /**
+     * Callback to apply throttling delay.
+     *
+     * @param isPageInCheckpoint flag indicating if current page is in scope 
of current checkpoint.

Review Comment:
   ```suggestion
        * @param isPageInCheckpoint Flag indicating if current page is in scope 
of current checkpoint.
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/ExponentialBackoff.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements exponential backoff logic. Contains a counter and increments it 
on each {@link #nextDuration()}.
+ * May be reset using {@link #reset()}.
+ */
+class ExponentialBackoff {
+    /**
+     * Starting backoff duration.
+     */
+    private final long startingBackoffNanos;
+
+    /**
+     * Backoff ratio. Each next duration will be this times longer.
+     */
+    private final double backoffRatio;
+
+    /**
+     * Exponential backoff counter.
+     */
+    private final AtomicInteger exponentialBackoffCounter = new 
AtomicInteger(0);
+
+    /**
+     * Constructs a new instance with the given parameters.
+     *
+     * @param startingBackoffNanos duration of first backoff in nanoseconds
+     * @param backoffRatio         each next duration will be this times longer
+     */
+    public ExponentialBackoff(long startingBackoffNanos, double backoffRatio) {
+        this.startingBackoffNanos = startingBackoffNanos;
+        this.backoffRatio = backoffRatio;
+    }
+
+    /**
+     * Returns next backoff duration (in nanoseconds). As a side effect, 
increments the backoff counter so that
+     * next call will return a longer duration.
+     *
+     * @return next backoff duration in nanoseconds

Review Comment:
   ```suggestion
        * @return Next backoff duration in nanoseconds.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to