Michał Misiewicz created FLINK-38096:
----------------------------------------

             Summary: AsyncSinkWriter may hang when configured with a custom 
rate limiting strategy
                 Key: FLINK-38096
                 URL: https://issues.apache.org/jira/browse/FLINK-38096
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Common
    Affects Versions: 1.20.2, 1.20.1, 1.20.0, 2.0.0
            Reporter: Michał Misiewicz


[AsyncSinkWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java]
 may hang when using a custom rate-limiting strategy that may block new 
requests when no others are in progress. This issue occurs when implementing 
rate limits, such as restricting API requests per interval.

Given the following RateLimitingStrategy implementation based on 
[bucket4j|https://github.com/bucket4j/bucket4j]:

 
{code:java}
package pl.zabka.cdp.common.ratelimit

import com.typesafe.scalalogging.LazyLogging
import io.github.bucket4j.Bucket
import org.apache.flink.connector.base.sink.writer.strategy.{
  RateLimitingStrategy,
  RequestInfo,
  ResultInfo
}

import java.io.Serializable

class TokenBucketRateLimitingStrategy(
    maxInFlightRequests: Int,
    tokensPerSecond: Long,
    tokensPerMinute: Long
) extends RateLimitingStrategy
    with LazyLogging
    with Serializable {

  @transient
  private var currentInFlightRequests = 0

  @transient
  private lazy val bucket: Bucket = TokenBucketProvider.getInstance(
    "TokenBucketRateLimitingStrategy",
    tokensPerSecond,
    tokensPerMinute
  )

  override def shouldBlock(requestInfo: RequestInfo): Boolean = {
    currentInFlightRequests >= maxInFlightRequests || 
areTokensNotAvailable(requestInfo)
  }

  private def areTokensNotAvailable(requestInfo: RequestInfo): Boolean = {
    val batchSize = requestInfo.getBatchSize
    if (batchSize <= 0) {
      logger.debug(s"Received request with invalid batch size: $batchSize, 
allowing to proceed")
      return false
    }
    !bucket.estimateAbilityToConsume(batchSize).canBeConsumed
  }

    ...
} {code}
 

 

AsyncSinkWriter may hang on 
[mailboxExecutor.yield()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L356]
 when all tokens are already consumed and no requests are in flight:

 
{code:java}
private void flush() throws InterruptedException {
    RequestInfo requestInfo = createRequestInfo();
    while (rateLimitingStrategy.shouldBlock(requestInfo)) {
        mailboxExecutor.yield();
        requestInfo = createRequestInfo();
    }
    ...
}{code}
 

We're observing Flink hanging during checkpointing, which leads to job failure 
caused by checkpoint timeout.

 

Solution:

Ensure mailboxExecutor.yield() is only called when in-flight requests are 
present. Replacing it with the private function 
[yieldIfThereExistsInFlightRequests()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L443]
 provides a direct solution to this problem:
{code:java}
private void flush() throws InterruptedException {
    RequestInfo requestInfo = createRequestInfo();
    while (rateLimitingStrategy.shouldBlock(requestInfo)) {
        yieldIfThereExistsInFlightRequests();
        requestInfo = createRequestInfo();
    }
    ...
} {code}
 
 
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to