tkalkirill commented on code in PR #5289: URL: https://github.com/apache/ignite-3/pull/5289#discussion_r1969628946
########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java: ########## @@ -155,18 +157,14 @@ public class PersistentPageMemory implements PageMemory { /** Try again tag. */ public static final int TRY_AGAIN_TAG = -1; - /** - * Threshold of the checkpoint buffer. We should start forcefully checkpointing its pages upon exceeding it. The value of {@code 2/3} is - * ported from {@code Ignite 2.x}. - */ - private static final float CP_BUF_FILL_THRESHOLD = 2.0f / 3; - /** Data region configuration view. */ private final PersistentPageMemoryProfileView storageProfileView; /** Page IO registry. */ private final PageIoRegistry ioRegistry; + private final Supplier<CheckpointProgress> checkpointProgressSupplier; Review Comment: If it (the supplier) can return null, please add `Supplier<@Nullable CheckpointProgress>`. ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java: ########## @@ -282,6 +285,14 @@ public PersistentPageMemory( } delayedPageReplacementTracker = new DelayedPageReplacementTracker(pageSize, flushDirtyPageForReplacement, LOG, sizes.length - 1); + + // new PagesWriteThrottle( Review Comment: Are these comments necessary? Or write why it is needs. ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java: ########## @@ -213,6 +210,9 @@ public class PersistentPageMemory implements PageMemory { /** Checkpoint page pool, {@code null} if not {@link #start() started}. */ private volatile @Nullable PagePool checkpointPool; + /** Pages write throttle. */ + private final PagesWriteThrottlePolicy writeThrottle; Review Comment: Please add `@Nullable`. ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java: ########## @@ -1816,6 +1821,27 @@ public CheckpointUrgency checkpointUrgency() { return checkpointUrgency.get(); } + /** + * Checks if region is sufficiently full. + * + * @param dirtyRatioThreshold Max allowed dirty pages ration. + */ + public boolean shouldThrottle(double dirtyRatioThreshold) { + Segment[] segments = this.segments; + + if (segments == null) { + return false; + } + + for (Segment segment : segments) { Review Comment: I think that iteration over the counting loop will be performed faster and without additional iterator allocation. ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java: ########## @@ -55,4 +55,9 @@ public interface CheckpointProgress { * written. */ @Nullable CheckpointDirtyPages pagesToWrite(); + + /** + * Returns a number of written checkpoint pages. + */ Review Comment: ```suggestion /** Returns a number of written checkpoint pages. */ ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java: ########## @@ -1835,7 +1861,11 @@ public int maxCheckpointBufferPages() { } private void releaseCheckpointBufferPage(long tmpBufPtr) { - checkpointPool.releaseFreePage(tmpBufPtr); + int resultCounter = checkpointPool.releaseFreePage(tmpBufPtr); + + if (writeThrottle != null && resultCounter == checkpointPool.pages() / 2) { Review Comment: Shouldn't we replace it with `>>> 1`, which is faster? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java: ########## @@ -1459,13 +1478,13 @@ private long replacementSize() { private void acquirePage(long absPtr) { PageHeader.acquirePage(absPtr); - updateAtomicInt(acquiredPagesPtr, 1); + GridUnsafe.incrementAndGetInt(acquiredPagesPtr); Review Comment: Maybe use static import? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/ExponentialBackoff.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Implements exponential backoff logic. Contains a counter and increments it on each {@link #nextDuration()}. + * May be reset using {@link #reset()}. + */ +class ExponentialBackoff { + /** + * Starting backoff duration. + */ + private final long startingBackoffNanos; + + /** + * Backoff ratio. Each next duration will be this times longer. + */ + private final double backoffRatio; + + /** + * Exponential backoff counter. + */ + private final AtomicInteger exponentialBackoffCounter = new AtomicInteger(0); + + /** + * Constructs a new instance with the given parameters. + * + * @param startingBackoffNanos duration of first backoff in nanoseconds + * @param backoffRatio each next duration will be this times longer Review Comment: ```suggestion * Constructs a new instance with the given parameters. * * @param startingBackoffNanos Duration of first backoff in nanoseconds. * @param backoffRatio Each next duration will be this times longer. ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress; + +/** + * Throttles threads that generate dirty pages during ongoing checkpoint. + * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed. + */ +public class PagesWriteThrottle implements PagesWriteThrottlePolicy { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(PagesWriteThrottle.class); + + /** Page memory. */ + private final PersistentPageMemory pageMemory; + + /** Database manager. */ + private final Supplier<CheckpointProgress> cpProgress; + + /** If true, throttle will only protect from checkpoint buffer overflow, not from dirty pages ratio cap excess. */ + private final boolean throttleOnlyPagesInCheckpoint; + + /** Checkpoint lock state checker. */ + private final CheckpointLockStateChecker stateChecker; + + /** In-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy inCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Not-in-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy notInCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Checkpoint Buffer-related logic used to keep it safe. */ + private final CheckpointBufferOverflowWatchdog cpBufferWatchdog; + + /** Threads that are throttled due to checkpoint buffer overflow. */ + private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new ConcurrentHashMap<>(); + + /** + * Constructor. + * + * @param pageMemory Page memory. + * @param cpProgress Database manager. + * @param stateChecker checkpoint lock state checker. + * @param throttleOnlyPagesInCheckpoint If true, throttle will only protect from checkpoint buffer overflow. + */ + public PagesWriteThrottle( + PersistentPageMemory pageMemory, + Supplier<CheckpointProgress> cpProgress, + CheckpointLockStateChecker stateChecker, + boolean throttleOnlyPagesInCheckpoint + ) { + this.pageMemory = pageMemory; + this.cpProgress = cpProgress; + this.stateChecker = stateChecker; + this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint; + cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory); + + assert throttleOnlyPagesInCheckpoint || cpProgress != null + : "cpProgress must be not null if ratio based throttling mode is used"; + } + + /** {@inheritDoc} */ + @Override public void onMarkDirty(boolean isPageInCheckpoint) { + assert stateChecker.checkpointLockIsHeldByThread(); + + boolean shouldThrottle = false; + + if (isPageInCheckpoint) { + shouldThrottle = isCpBufferOverflowThresholdExceeded(); + } + + if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) { + CheckpointProgress progress = cpProgress.get(); + + if (progress == null) { + return; // Don't throttle if checkpoint is not running. + } + + int cpWrittenPages = progress.writtenPages(); + + int cpTotalPages = progress.currentCheckpointPagesCount(); + + if (cpWrittenPages == cpTotalPages) { + // Checkpoint is already in fsync stage, increasing maximum ratio of dirty pages to 3/4 + shouldThrottle = pageMemory.shouldThrottle(0.75); + } else { + double dirtyRatioThreshold = ((double) cpWrittenPages) / cpTotalPages; + + // Starting with 0.05 to avoid throttle right after checkpoint start + // 7/12 is maximum ratio of dirty pages + dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 / 12; + + shouldThrottle = pageMemory.shouldThrottle(dirtyRatioThreshold); + } + } + + ExponentialBackoffThrottlingStrategy exponentialThrottle = isPageInCheckpoint + ? inCheckpointProtection : notInCheckpointProtection; + + if (shouldThrottle) { + long throttleParkTimeNs = exponentialThrottle.protectionParkTime(); + + Thread curThread = Thread.currentThread(); + + if (throttleParkTimeNs > LOGGING_THRESHOLD) { + LOG.warn("Parking thread=" + curThread.getName() + + " for timeout(ms)=" + TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs)); + } + + if (isPageInCheckpoint) { + cpBufThrottledThreads.put(curThread.getId(), curThread); + + try { + LockSupport.parkNanos(throttleParkTimeNs); + } finally { + cpBufThrottledThreads.remove(curThread.getId()); + + if (throttleParkTimeNs > LOGGING_THRESHOLD) { + LOG.warn("Unparking thread=" + curThread.getName() + + " with park timeout(ms)=" + TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs)); + } + } + } else { + LockSupport.parkNanos(throttleParkTimeNs); + } + } else { + boolean backoffWasAlreadyStarted = exponentialThrottle.resetBackoff(); + + if (isPageInCheckpoint && backoffWasAlreadyStarted) { + unparkParkedThreads(); + } + } + } + + /** {@inheritDoc} */ + @Override public void wakeupThrottledThreads() { + if (!isCpBufferOverflowThresholdExceeded()) { + inCheckpointProtection.resetBackoff(); + + unparkParkedThreads(); + } + } + + private void unparkParkedThreads() { + cpBufThrottledThreads.values().forEach(LockSupport::unpark); + } + + /** {@inheritDoc} */ Review Comment: ```suggestion ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress; + +/** + * Throttles threads that generate dirty pages during ongoing checkpoint. + * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed. + */ +public class PagesWriteThrottle implements PagesWriteThrottlePolicy { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(PagesWriteThrottle.class); + + /** Page memory. */ + private final PersistentPageMemory pageMemory; + + /** Database manager. */ + private final Supplier<CheckpointProgress> cpProgress; + + /** If true, throttle will only protect from checkpoint buffer overflow, not from dirty pages ratio cap excess. */ + private final boolean throttleOnlyPagesInCheckpoint; + + /** Checkpoint lock state checker. */ + private final CheckpointLockStateChecker stateChecker; + + /** In-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy inCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Not-in-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy notInCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Checkpoint Buffer-related logic used to keep it safe. */ + private final CheckpointBufferOverflowWatchdog cpBufferWatchdog; + + /** Threads that are throttled due to checkpoint buffer overflow. */ + private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new ConcurrentHashMap<>(); + + /** + * Constructor. + * + * @param pageMemory Page memory. + * @param cpProgress Database manager. + * @param stateChecker checkpoint lock state checker. + * @param throttleOnlyPagesInCheckpoint If true, throttle will only protect from checkpoint buffer overflow. + */ + public PagesWriteThrottle( + PersistentPageMemory pageMemory, + Supplier<CheckpointProgress> cpProgress, + CheckpointLockStateChecker stateChecker, + boolean throttleOnlyPagesInCheckpoint + ) { + this.pageMemory = pageMemory; + this.cpProgress = cpProgress; + this.stateChecker = stateChecker; + this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint; + cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory); + + assert throttleOnlyPagesInCheckpoint || cpProgress != null + : "cpProgress must be not null if ratio based throttling mode is used"; + } + + /** {@inheritDoc} */ Review Comment: ```suggestion ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress; + +/** + * Throttles threads that generate dirty pages during ongoing checkpoint. + * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed. + */ +public class PagesWriteThrottle implements PagesWriteThrottlePolicy { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(PagesWriteThrottle.class); + + /** Page memory. */ + private final PersistentPageMemory pageMemory; + + /** Database manager. */ + private final Supplier<CheckpointProgress> cpProgress; + + /** If true, throttle will only protect from checkpoint buffer overflow, not from dirty pages ratio cap excess. */ + private final boolean throttleOnlyPagesInCheckpoint; + + /** Checkpoint lock state checker. */ + private final CheckpointLockStateChecker stateChecker; + + /** In-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy inCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Not-in-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy notInCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Checkpoint Buffer-related logic used to keep it safe. */ + private final CheckpointBufferOverflowWatchdog cpBufferWatchdog; + + /** Threads that are throttled due to checkpoint buffer overflow. */ + private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new ConcurrentHashMap<>(); + + /** + * Constructor. + * + * @param pageMemory Page memory. + * @param cpProgress Database manager. + * @param stateChecker checkpoint lock state checker. + * @param throttleOnlyPagesInCheckpoint If true, throttle will only protect from checkpoint buffer overflow. + */ + public PagesWriteThrottle( + PersistentPageMemory pageMemory, + Supplier<CheckpointProgress> cpProgress, + CheckpointLockStateChecker stateChecker, + boolean throttleOnlyPagesInCheckpoint + ) { + this.pageMemory = pageMemory; + this.cpProgress = cpProgress; + this.stateChecker = stateChecker; + this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint; + cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory); + + assert throttleOnlyPagesInCheckpoint || cpProgress != null + : "cpProgress must be not null if ratio based throttling mode is used"; + } + + /** {@inheritDoc} */ + @Override public void onMarkDirty(boolean isPageInCheckpoint) { + assert stateChecker.checkpointLockIsHeldByThread(); + + boolean shouldThrottle = false; + + if (isPageInCheckpoint) { + shouldThrottle = isCpBufferOverflowThresholdExceeded(); + } + + if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) { + CheckpointProgress progress = cpProgress.get(); + + if (progress == null) { + return; // Don't throttle if checkpoint is not running. + } + + int cpWrittenPages = progress.writtenPages(); + + int cpTotalPages = progress.currentCheckpointPagesCount(); + + if (cpWrittenPages == cpTotalPages) { + // Checkpoint is already in fsync stage, increasing maximum ratio of dirty pages to 3/4 + shouldThrottle = pageMemory.shouldThrottle(0.75); + } else { + double dirtyRatioThreshold = ((double) cpWrittenPages) / cpTotalPages; + + // Starting with 0.05 to avoid throttle right after checkpoint start + // 7/12 is maximum ratio of dirty pages + dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 / 12; + + shouldThrottle = pageMemory.shouldThrottle(dirtyRatioThreshold); + } + } + + ExponentialBackoffThrottlingStrategy exponentialThrottle = isPageInCheckpoint + ? inCheckpointProtection : notInCheckpointProtection; + + if (shouldThrottle) { + long throttleParkTimeNs = exponentialThrottle.protectionParkTime(); + + Thread curThread = Thread.currentThread(); + + if (throttleParkTimeNs > LOGGING_THRESHOLD) { + LOG.warn("Parking thread=" + curThread.getName() + + " for timeout(ms)=" + TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs)); + } + + if (isPageInCheckpoint) { + cpBufThrottledThreads.put(curThread.getId(), curThread); + + try { + LockSupport.parkNanos(throttleParkTimeNs); + } finally { + cpBufThrottledThreads.remove(curThread.getId()); + + if (throttleParkTimeNs > LOGGING_THRESHOLD) { + LOG.warn("Unparking thread=" + curThread.getName() + + " with park timeout(ms)=" + TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs)); + } + } + } else { + LockSupport.parkNanos(throttleParkTimeNs); + } + } else { + boolean backoffWasAlreadyStarted = exponentialThrottle.resetBackoff(); + + if (isPageInCheckpoint && backoffWasAlreadyStarted) { + unparkParkedThreads(); + } + } + } + + /** {@inheritDoc} */ Review Comment: ```suggestion ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress; + +/** + * Throttles threads that generate dirty pages during ongoing checkpoint. + * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed. + */ +public class PagesWriteThrottle implements PagesWriteThrottlePolicy { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(PagesWriteThrottle.class); + + /** Page memory. */ + private final PersistentPageMemory pageMemory; + + /** Database manager. */ + private final Supplier<CheckpointProgress> cpProgress; + + /** If true, throttle will only protect from checkpoint buffer overflow, not from dirty pages ratio cap excess. */ + private final boolean throttleOnlyPagesInCheckpoint; + + /** Checkpoint lock state checker. */ + private final CheckpointLockStateChecker stateChecker; + + /** In-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy inCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Not-in-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy notInCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Checkpoint Buffer-related logic used to keep it safe. */ + private final CheckpointBufferOverflowWatchdog cpBufferWatchdog; + + /** Threads that are throttled due to checkpoint buffer overflow. */ + private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new ConcurrentHashMap<>(); + + /** + * Constructor. + * + * @param pageMemory Page memory. + * @param cpProgress Database manager. + * @param stateChecker checkpoint lock state checker. + * @param throttleOnlyPagesInCheckpoint If true, throttle will only protect from checkpoint buffer overflow. + */ + public PagesWriteThrottle( + PersistentPageMemory pageMemory, + Supplier<CheckpointProgress> cpProgress, + CheckpointLockStateChecker stateChecker, + boolean throttleOnlyPagesInCheckpoint + ) { + this.pageMemory = pageMemory; + this.cpProgress = cpProgress; + this.stateChecker = stateChecker; + this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint; + cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory); + + assert throttleOnlyPagesInCheckpoint || cpProgress != null + : "cpProgress must be not null if ratio based throttling mode is used"; + } + + /** {@inheritDoc} */ + @Override public void onMarkDirty(boolean isPageInCheckpoint) { + assert stateChecker.checkpointLockIsHeldByThread(); + + boolean shouldThrottle = false; + + if (isPageInCheckpoint) { + shouldThrottle = isCpBufferOverflowThresholdExceeded(); + } + + if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) { + CheckpointProgress progress = cpProgress.get(); + + if (progress == null) { + return; // Don't throttle if checkpoint is not running. + } + + int cpWrittenPages = progress.writtenPages(); + + int cpTotalPages = progress.currentCheckpointPagesCount(); + + if (cpWrittenPages == cpTotalPages) { + // Checkpoint is already in fsync stage, increasing maximum ratio of dirty pages to 3/4 + shouldThrottle = pageMemory.shouldThrottle(0.75); + } else { + double dirtyRatioThreshold = ((double) cpWrittenPages) / cpTotalPages; + + // Starting with 0.05 to avoid throttle right after checkpoint start + // 7/12 is maximum ratio of dirty pages + dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 / 12; + + shouldThrottle = pageMemory.shouldThrottle(dirtyRatioThreshold); + } + } + + ExponentialBackoffThrottlingStrategy exponentialThrottle = isPageInCheckpoint + ? inCheckpointProtection : notInCheckpointProtection; + + if (shouldThrottle) { + long throttleParkTimeNs = exponentialThrottle.protectionParkTime(); + + Thread curThread = Thread.currentThread(); + + if (throttleParkTimeNs > LOGGING_THRESHOLD) { + LOG.warn("Parking thread=" + curThread.getName() + + " for timeout(ms)=" + TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs)); + } + + if (isPageInCheckpoint) { + cpBufThrottledThreads.put(curThread.getId(), curThread); + + try { + LockSupport.parkNanos(throttleParkTimeNs); + } finally { + cpBufThrottledThreads.remove(curThread.getId()); + + if (throttleParkTimeNs > LOGGING_THRESHOLD) { + LOG.warn("Unparking thread=" + curThread.getName() + + " with park timeout(ms)=" + TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs)); + } + } + } else { + LockSupport.parkNanos(throttleParkTimeNs); + } + } else { + boolean backoffWasAlreadyStarted = exponentialThrottle.resetBackoff(); + + if (isPageInCheckpoint && backoffWasAlreadyStarted) { + unparkParkedThreads(); + } + } + } + + /** {@inheritDoc} */ + @Override public void wakeupThrottledThreads() { + if (!isCpBufferOverflowThresholdExceeded()) { + inCheckpointProtection.resetBackoff(); + + unparkParkedThreads(); + } + } + + private void unparkParkedThreads() { + cpBufThrottledThreads.values().forEach(LockSupport::unpark); + } + + /** {@inheritDoc} */ + @Override public void onBeginCheckpoint() { + } + + /** {@inheritDoc} */ Review Comment: ```suggestion ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress; + +/** + * Throttles threads that generate dirty pages during ongoing checkpoint. + * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed. + */ +public class PagesWriteThrottle implements PagesWriteThrottlePolicy { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(PagesWriteThrottle.class); + + /** Page memory. */ + private final PersistentPageMemory pageMemory; + + /** Database manager. */ + private final Supplier<CheckpointProgress> cpProgress; + + /** If true, throttle will only protect from checkpoint buffer overflow, not from dirty pages ratio cap excess. */ + private final boolean throttleOnlyPagesInCheckpoint; + + /** Checkpoint lock state checker. */ + private final CheckpointLockStateChecker stateChecker; + + /** In-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy inCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Not-in-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy notInCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Checkpoint Buffer-related logic used to keep it safe. */ + private final CheckpointBufferOverflowWatchdog cpBufferWatchdog; + + /** Threads that are throttled due to checkpoint buffer overflow. */ + private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new ConcurrentHashMap<>(); + + /** + * Constructor. + * + * @param pageMemory Page memory. + * @param cpProgress Database manager. + * @param stateChecker checkpoint lock state checker. + * @param throttleOnlyPagesInCheckpoint If true, throttle will only protect from checkpoint buffer overflow. + */ + public PagesWriteThrottle( + PersistentPageMemory pageMemory, + Supplier<CheckpointProgress> cpProgress, + CheckpointLockStateChecker stateChecker, + boolean throttleOnlyPagesInCheckpoint + ) { + this.pageMemory = pageMemory; + this.cpProgress = cpProgress; + this.stateChecker = stateChecker; + this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint; + cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory); + + assert throttleOnlyPagesInCheckpoint || cpProgress != null + : "cpProgress must be not null if ratio based throttling mode is used"; + } + + /** {@inheritDoc} */ + @Override public void onMarkDirty(boolean isPageInCheckpoint) { + assert stateChecker.checkpointLockIsHeldByThread(); + + boolean shouldThrottle = false; + + if (isPageInCheckpoint) { + shouldThrottle = isCpBufferOverflowThresholdExceeded(); + } + + if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) { + CheckpointProgress progress = cpProgress.get(); + + if (progress == null) { + return; // Don't throttle if checkpoint is not running. + } + + int cpWrittenPages = progress.writtenPages(); + + int cpTotalPages = progress.currentCheckpointPagesCount(); + + if (cpWrittenPages == cpTotalPages) { + // Checkpoint is already in fsync stage, increasing maximum ratio of dirty pages to 3/4 + shouldThrottle = pageMemory.shouldThrottle(0.75); + } else { + double dirtyRatioThreshold = ((double) cpWrittenPages) / cpTotalPages; + + // Starting with 0.05 to avoid throttle right after checkpoint start + // 7/12 is maximum ratio of dirty pages Review Comment: ```suggestion // 7/12 is maximum ratio of dirty pages. ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress; + +/** + * Throttles threads that generate dirty pages during ongoing checkpoint. + * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed. + */ +public class PagesWriteThrottle implements PagesWriteThrottlePolicy { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(PagesWriteThrottle.class); + + /** Page memory. */ + private final PersistentPageMemory pageMemory; + + /** Database manager. */ + private final Supplier<CheckpointProgress> cpProgress; + + /** If true, throttle will only protect from checkpoint buffer overflow, not from dirty pages ratio cap excess. */ + private final boolean throttleOnlyPagesInCheckpoint; + + /** Checkpoint lock state checker. */ + private final CheckpointLockStateChecker stateChecker; + + /** In-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy inCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Not-in-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy notInCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Checkpoint Buffer-related logic used to keep it safe. */ + private final CheckpointBufferOverflowWatchdog cpBufferWatchdog; + + /** Threads that are throttled due to checkpoint buffer overflow. */ + private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new ConcurrentHashMap<>(); + + /** + * Constructor. + * + * @param pageMemory Page memory. + * @param cpProgress Database manager. + * @param stateChecker checkpoint lock state checker. + * @param throttleOnlyPagesInCheckpoint If true, throttle will only protect from checkpoint buffer overflow. + */ + public PagesWriteThrottle( + PersistentPageMemory pageMemory, + Supplier<CheckpointProgress> cpProgress, + CheckpointLockStateChecker stateChecker, + boolean throttleOnlyPagesInCheckpoint + ) { + this.pageMemory = pageMemory; + this.cpProgress = cpProgress; + this.stateChecker = stateChecker; + this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint; + cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory); + + assert throttleOnlyPagesInCheckpoint || cpProgress != null + : "cpProgress must be not null if ratio based throttling mode is used"; + } + + /** {@inheritDoc} */ + @Override public void onMarkDirty(boolean isPageInCheckpoint) { + assert stateChecker.checkpointLockIsHeldByThread(); + + boolean shouldThrottle = false; + + if (isPageInCheckpoint) { + shouldThrottle = isCpBufferOverflowThresholdExceeded(); + } + + if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) { + CheckpointProgress progress = cpProgress.get(); + + if (progress == null) { + return; // Don't throttle if checkpoint is not running. + } + + int cpWrittenPages = progress.writtenPages(); + + int cpTotalPages = progress.currentCheckpointPagesCount(); + + if (cpWrittenPages == cpTotalPages) { + // Checkpoint is already in fsync stage, increasing maximum ratio of dirty pages to 3/4 + shouldThrottle = pageMemory.shouldThrottle(0.75); + } else { + double dirtyRatioThreshold = ((double) cpWrittenPages) / cpTotalPages; + + // Starting with 0.05 to avoid throttle right after checkpoint start + // 7/12 is maximum ratio of dirty pages + dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 / 12; + + shouldThrottle = pageMemory.shouldThrottle(dirtyRatioThreshold); + } + } + + ExponentialBackoffThrottlingStrategy exponentialThrottle = isPageInCheckpoint + ? inCheckpointProtection : notInCheckpointProtection; + + if (shouldThrottle) { + long throttleParkTimeNs = exponentialThrottle.protectionParkTime(); + + Thread curThread = Thread.currentThread(); + + if (throttleParkTimeNs > LOGGING_THRESHOLD) { + LOG.warn("Parking thread=" + curThread.getName() + + " for timeout(ms)=" + TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs)); + } + + if (isPageInCheckpoint) { + cpBufThrottledThreads.put(curThread.getId(), curThread); + + try { + LockSupport.parkNanos(throttleParkTimeNs); + } finally { + cpBufThrottledThreads.remove(curThread.getId()); + + if (throttleParkTimeNs > LOGGING_THRESHOLD) { + LOG.warn("Unparking thread=" + curThread.getName() + + " with park timeout(ms)=" + TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs)); + } + } + } else { + LockSupport.parkNanos(throttleParkTimeNs); + } + } else { + boolean backoffWasAlreadyStarted = exponentialThrottle.resetBackoff(); + + if (isPageInCheckpoint && backoffWasAlreadyStarted) { + unparkParkedThreads(); + } + } + } + + /** {@inheritDoc} */ + @Override public void wakeupThrottledThreads() { + if (!isCpBufferOverflowThresholdExceeded()) { + inCheckpointProtection.resetBackoff(); + + unparkParkedThreads(); + } + } + + private void unparkParkedThreads() { + cpBufThrottledThreads.values().forEach(LockSupport::unpark); + } + + /** {@inheritDoc} */ + @Override public void onBeginCheckpoint() { + } + + /** {@inheritDoc} */ + @Override public void onFinishCheckpoint() { + inCheckpointProtection.resetBackoff(); + notInCheckpointProtection.resetBackoff(); + } + + /** {@inheritDoc} */ Review Comment: ```suggestion ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/ExponentialBackoff.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Implements exponential backoff logic. Contains a counter and increments it on each {@link #nextDuration()}. + * May be reset using {@link #reset()}. + */ +class ExponentialBackoff { + /** + * Starting backoff duration. + */ + private final long startingBackoffNanos; + + /** + * Backoff ratio. Each next duration will be this times longer. + */ + private final double backoffRatio; + + /** + * Exponential backoff counter. + */ + private final AtomicInteger exponentialBackoffCounter = new AtomicInteger(0); + + /** + * Constructs a new instance with the given parameters. + * + * @param startingBackoffNanos duration of first backoff in nanoseconds + * @param backoffRatio each next duration will be this times longer + */ + public ExponentialBackoff(long startingBackoffNanos, double backoffRatio) { + this.startingBackoffNanos = startingBackoffNanos; + this.backoffRatio = backoffRatio; + } + + /** + * Returns next backoff duration (in nanoseconds). As a side effect, increments the backoff counter so that + * next call will return a longer duration. + * + * @return next backoff duration in nanoseconds + */ + public long nextDuration() { + int exponent = exponentialBackoffCounter.getAndIncrement(); + return (long) (startingBackoffNanos * Math.pow(backoffRatio, exponent)); + } + + /** + * Resets the exponential backoff counter so that next call to {@link #nextDuration()} + * will return {@link #startingBackoffNanos}. + * + * @return {@code true} iff this backoff was not already in a reset state Review Comment: ```suggestion * @return {@code true} iff this backoff was not already in a reset state. ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/CheckpointBufferOverflowWatchdog.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +import static org.apache.ignite.internal.pagememory.persistence.throttling.PagesWriteThrottlePolicy.CP_BUF_FILL_THRESHOLD; + +import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; + +/** + * Logic used to determine whether Checkpoint Buffer is in danger zone and writer threads should be throttled. + */ +class CheckpointBufferOverflowWatchdog { + /** Page memory. */ + private final PersistentPageMemory pageMemory; + + /** + * Creates a new instance. + * + * @param pageMemory page memory to use + */ + CheckpointBufferOverflowWatchdog(PersistentPageMemory pageMemory) { + this.pageMemory = pageMemory; + } + + /** + * Returns true if Checkpoint Buffer is in danger zone (more than + * {@link PagesWriteThrottlePolicy#CP_BUF_FILL_THRESHOLD} of the buffer is filled) and, hence, writer threads need + * to be throttled. + * + * @return {@code true} iff Checkpoint Buffer is in danger zone Review Comment: ```suggestion * @return {@code true} iff Checkpoint Buffer is in danger zone. ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/CheckpointBufferOverflowWatchdog.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +import static org.apache.ignite.internal.pagememory.persistence.throttling.PagesWriteThrottlePolicy.CP_BUF_FILL_THRESHOLD; + +import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; + +/** + * Logic used to determine whether Checkpoint Buffer is in danger zone and writer threads should be throttled. + */ +class CheckpointBufferOverflowWatchdog { + /** Page memory. */ + private final PersistentPageMemory pageMemory; + + /** + * Creates a new instance. + * + * @param pageMemory page memory to use Review Comment: ```suggestion * @param pageMemory Page memory to use. ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress; + +/** + * Throttles threads that generate dirty pages during ongoing checkpoint. + * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed. + */ +public class PagesWriteThrottle implements PagesWriteThrottlePolicy { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(PagesWriteThrottle.class); + + /** Page memory. */ + private final PersistentPageMemory pageMemory; + + /** Database manager. */ + private final Supplier<CheckpointProgress> cpProgress; + + /** If true, throttle will only protect from checkpoint buffer overflow, not from dirty pages ratio cap excess. */ + private final boolean throttleOnlyPagesInCheckpoint; + + /** Checkpoint lock state checker. */ + private final CheckpointLockStateChecker stateChecker; + + /** In-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy inCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Not-in-checkpoint protection logic. */ + private final ExponentialBackoffThrottlingStrategy notInCheckpointProtection + = new ExponentialBackoffThrottlingStrategy(); + + /** Checkpoint Buffer-related logic used to keep it safe. */ + private final CheckpointBufferOverflowWatchdog cpBufferWatchdog; + + /** Threads that are throttled due to checkpoint buffer overflow. */ + private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new ConcurrentHashMap<>(); + + /** + * Constructor. + * + * @param pageMemory Page memory. + * @param cpProgress Database manager. + * @param stateChecker checkpoint lock state checker. + * @param throttleOnlyPagesInCheckpoint If true, throttle will only protect from checkpoint buffer overflow. Review Comment: ```suggestion * @param throttleOnlyPagesInCheckpoint If {@code true}, throttle will only protect from checkpoint buffer overflow. ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottlePolicy.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +/** + * Throttling policy, encapsulates logic of delaying write operations. + * + * <p>There are two resources that get (or might get) consumed when writing: + * <ul> + * <li> + * <b>Checkpoint Buffer</b> where a page is placed when, being under checkpoint, it gets written + * </li> + * <li> + * <b>Clean pages</b> which get dirtied when writes occur + * </li> + * </ul> + * Both resources are limited in size. Both are freed when checkpoint finishes. This means that, if writers + * write too fast, they can consume any of these two resources before we have a chance to finish a checkpoint. + * If this happens, the cluster fails or stalls. + * + * <p>Write throttling solves this problem by slowing down the writers to a rate at which they do not exhaust + * any of the two resources. + * + * <p>An alternative to just slowing down is to wait in a loop till the resource we're after gets freed, and + * only then allow the write to happen. The problem with this approach is that we cannot wait in a loop/sleep + * under a write lock, so the logic would be a lot more complicated. Maybe in the future we'll follow this path, + * but for now, a simpler approach of just throttling is used (see below). + * + * <p>If we just slow writers down by throttling their writes, AND we have enough Checkpoint Buffer and pages in + * segments to take some load bursts, we are fine. Under such assumptions, it does not matter whether we throttle + * a writer thread before acquiring write lock or after it gets released; in the current implementation, this + * happens after write lock gets released (because it was considered simpler to implement). + * + * <p>The actual throttling happens when a page gets marked dirty by calling {@link #onMarkDirty(boolean)}. + * + * <p>There are two additional methods for interfacing with other parts of the system: + * <ul> + * <li>{@link #wakeupThrottledThreads()} which wakes up the threads currently being throttled; in the current + * implementation, it is called when Checkpoint Buffer utilization falls below 1/2.</li> + * <li>{@link #isCpBufferOverflowThresholdExceeded()} which is called by a checkpointer to see whether the Checkpoint Buffer is + * in a danger zone and, if yes, it starts to prioritize writing pages from the Checkpoint Buffer over + * pages from the normal checkpoint sequence.</li> + * </ul> + */ +public interface PagesWriteThrottlePolicy { + // TODO Maybe make it configurable in IGNITE-24548 + /** Min park time which triggers logging. */ + long LOGGING_THRESHOLD = 10; + + /** Checkpoint buffer fullfill upper bound. */ + float CP_BUF_FILL_THRESHOLD = 2f / 3; + + /** + * Callback to apply throttling delay. + * + * @param isPageInCheckpoint flag indicating if current page is in scope of current checkpoint. Review Comment: ```suggestion * @param isPageInCheckpoint Flag indicating if current page is in scope of current checkpoint. ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/ExponentialBackoff.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.throttling; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Implements exponential backoff logic. Contains a counter and increments it on each {@link #nextDuration()}. + * May be reset using {@link #reset()}. + */ +class ExponentialBackoff { + /** + * Starting backoff duration. + */ + private final long startingBackoffNanos; + + /** + * Backoff ratio. Each next duration will be this times longer. + */ + private final double backoffRatio; + + /** + * Exponential backoff counter. + */ + private final AtomicInteger exponentialBackoffCounter = new AtomicInteger(0); + + /** + * Constructs a new instance with the given parameters. + * + * @param startingBackoffNanos duration of first backoff in nanoseconds + * @param backoffRatio each next duration will be this times longer + */ + public ExponentialBackoff(long startingBackoffNanos, double backoffRatio) { + this.startingBackoffNanos = startingBackoffNanos; + this.backoffRatio = backoffRatio; + } + + /** + * Returns next backoff duration (in nanoseconds). As a side effect, increments the backoff counter so that + * next call will return a longer duration. + * + * @return next backoff duration in nanoseconds Review Comment: ```suggestion * @return Next backoff duration in nanoseconds. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org