Michał Misiewicz created FLINK-38096: ----------------------------------------
Summary: AsyncSinkWriter may hang when configured with a custom rate limiting strategy Key: FLINK-38096 URL: https://issues.apache.org/jira/browse/FLINK-38096 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.20.2, 1.20.1, 1.20.0, 2.0.0 Reporter: Michał Misiewicz [AsyncSinkWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java] may hang when using a custom rate-limiting strategy that may block new requests when no others are in progress. This issue occurs when implementing rate limits, such as restricting API requests per interval. Given the following RateLimitingStrategy implementation based on [bucket4j|https://github.com/bucket4j/bucket4j]: {code:java} package pl.zabka.cdp.common.ratelimit import com.typesafe.scalalogging.LazyLogging import io.github.bucket4j.Bucket import org.apache.flink.connector.base.sink.writer.strategy.{ RateLimitingStrategy, RequestInfo, ResultInfo } import java.io.Serializable class TokenBucketRateLimitingStrategy( maxInFlightRequests: Int, tokensPerSecond: Long, tokensPerMinute: Long ) extends RateLimitingStrategy with LazyLogging with Serializable { @transient private var currentInFlightRequests = 0 @transient private lazy val bucket: Bucket = TokenBucketProvider.getInstance( "TokenBucketRateLimitingStrategy", tokensPerSecond, tokensPerMinute ) override def shouldBlock(requestInfo: RequestInfo): Boolean = { currentInFlightRequests >= maxInFlightRequests || areTokensNotAvailable(requestInfo) } private def areTokensNotAvailable(requestInfo: RequestInfo): Boolean = { val batchSize = requestInfo.getBatchSize if (batchSize <= 0) { logger.debug(s"Received request with invalid batch size: $batchSize, allowing to proceed") return false } !bucket.estimateAbilityToConsume(batchSize).canBeConsumed } ... } {code} AsyncSinkWriter may hang on [mailboxExecutor.yield()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L356] when all tokens are already consumed and no requests are in flight: {code:java} private void flush() throws InterruptedException { RequestInfo requestInfo = createRequestInfo(); while (rateLimitingStrategy.shouldBlock(requestInfo)) { mailboxExecutor.yield(); requestInfo = createRequestInfo(); } ... }{code} We're observing Flink hanging during checkpointing, which leads to job failure caused by checkpoint timeout. Solution: Ensure mailboxExecutor.yield() is only called when in-flight requests are present. Replacing it with the private function [yieldIfThereExistsInFlightRequests()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L443] provides a direct solution to this problem: {code:java} private void flush() throws InterruptedException { RequestInfo requestInfo = createRequestInfo(); while (rateLimitingStrategy.shouldBlock(requestInfo)) { yieldIfThereExistsInFlightRequests(); requestInfo = createRequestInfo(); } ... } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)