hvanhovell commented on code in PR #49712: URL: https://github.com/apache/spark/pull/49712#discussion_r1935670931
########## common/utils/src/main/scala/org/apache/spark/internal/Logging.scala: ########## @@ -531,3 +533,156 @@ private[spark] object Logging { override def isStopped: Boolean = status == LifeCycle.State.STOPPED } } + +/** + * A thread-safe token bucket-based throttler implementation with nanosecond accuracy. + * + * Each instance must be shared across all scopes it should throttle. + * For global throttling that means either by extending this class in an `object` or + * by creating the instance as a field of an `object`. + * + * @param bucketSize This corresponds to the largest possible burst without throttling, + * in number of executions. + * @param tokenRecoveryInterval Time between two tokens being added back to the bucket. + * This is reciprocal of the long-term average unthrottled rate. + * + * Example: With a bucket size of 100 and a recovery interval of 1s, we could log up to 100 events + * in under a second without throttling, but at that point the bucket is exhausted and we only + * regain the ability to log more events at 1 event per second. If we log less than 1 event/s + * the bucket will slowly refill until it's back at 100. + * Either way, we can always log at least 1 event/s. + */ +class LogThrottler( + val bucketSize: Int = 100, + val tokenRecoveryInterval: FiniteDuration = 1.second, + val timeSource: NanoTimeTimeSource = SystemNanoTimeSource) extends Logging { + + private var remainingTokens = bucketSize + private var nextRecovery: DeadlineWithTimeSource = + DeadlineWithTimeSource.now(timeSource) + tokenRecoveryInterval + private var numSkipped: Long = 0 + + /** + * Run `thunk` as long as there are tokens remaining in the bucket, + * otherwise skip and remember number of skips. + * + * The argument to `thunk` is how many previous invocations have been skipped since the last time + * an invocation actually ran. + * + * Note: This method is `synchronized`, so it is concurrency safe. + * However, that also means no heavy-lifting should be done as part of this + * if the throttler is shared between concurrent threads. + * This also means that the synchronized block of the `thunk` that *does* execute will still + * hold up concurrent `thunk`s that will actually get rejected once they hold the lock. + * This is fine at low concurrency/low recovery rates. But if we need this to be more efficient at + * some point, we will need to decouple the check from the `thunk` execution. + */ + def throttled(thunk: Long => Unit): Unit = this.synchronized { + tryRecoverTokens() + if (remainingTokens > 0) { + thunk(numSkipped) + numSkipped = 0 + remainingTokens -= 1 + } else { + numSkipped += 1L + } + } + + /** + * Same as [[throttled]] but turns the number of skipped invocations into a logging message + * that can be appended to item being logged in `thunk`. + */ + def throttledWithSkippedLogMessage(thunk: MessageWithContext => Unit): Unit = { + this.throttled { numSkipped => + val skippedStr = if (numSkipped != 0L) { + log" [${MDC(LogKeys.NUM_SKIPPED, numSkipped)} similar messages were skipped.]" + } else { + log"" + } + thunk(skippedStr) + } + } + + /** + * Try to recover tokens, if the rate allows. + * + * Only call from within a `this.synchronized` block! + */ + private[spark] def tryRecoverTokens(): Unit = { + try { + // Doing it one-by-one is a bit inefficient for long periods, but it's easy to avoid jumps + // and rounding errors this way. The inefficiency shouldn't matter as long as the bucketSize + // isn't huge. + while (remainingTokens < bucketSize && nextRecovery.isOverdue()) { + remainingTokens += 1 + nextRecovery += tokenRecoveryInterval + } + if (remainingTokens == bucketSize && + (DeadlineWithTimeSource.now(timeSource) - nextRecovery) > tokenRecoveryInterval) { Review Comment: Nit `DeadlineWithTimeSource.now(timeSource)` you are creating this twice. You could do it once, and ignore the few 10s of ns... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org