[
https://issues.apache.org/jira/browse/FLINK-38096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michał Misiewicz updated FLINK-38096:
-------------------------------------
Description:
[AsyncSinkWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java]
may hang when using a custom rate-limiting strategy that may block new
requests when no others are in progress. This issue occurs when implementing
rate limits, such as restricting API requests per interval.
Given the following RateLimitingStrategy implementation based on
[bucket4j|https://github.com/bucket4j/bucket4j]:
{code:java}
package pl.zabka.cdp.common.ratelimit
import com.typesafe.scalalogging.LazyLogging
import io.github.bucket4j.Bucket
import org.apache.flink.connector.base.sink.writer.strategy.{
RateLimitingStrategy,
RequestInfo,
ResultInfo
}
import java.io.Serializable
class TokenBucketRateLimitingStrategy(
maxInFlightRequests: Int,
tokensPerSecond: Long,
tokensPerMinute: Long
) extends RateLimitingStrategy
with LazyLogging
with Serializable {
@transient
private var currentInFlightRequests = 0
@transient
private lazy val bucket: Bucket = TokenBucketProvider.getInstance(
"TokenBucketRateLimitingStrategy",
tokensPerSecond,
tokensPerMinute
)
override def shouldBlock(requestInfo: RequestInfo): Boolean = {
currentInFlightRequests >= maxInFlightRequests ||
areTokensNotAvailable(requestInfo)
}
private def areTokensNotAvailable(requestInfo: RequestInfo): Boolean = {
val batchSize = requestInfo.getBatchSize
if (batchSize <= 0) {
logger.debug(s"Received request with invalid batch size: $batchSize,
allowing to proceed")
return false
}
!bucket.estimateAbilityToConsume(batchSize).canBeConsumed
}
...
} {code}
AsyncSinkWriter may hang on
[mailboxExecutor.yield()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L356]
when all tokens are already consumed and no requests are in flight:
{code:java}
private void flush() throws InterruptedException {
RequestInfo requestInfo = createRequestInfo();
while (rateLimitingStrategy.shouldBlock(requestInfo)) {
mailboxExecutor.yield();
requestInfo = createRequestInfo();
}
...
}{code}
Rate limiting activation causes Flink to hang during checkpointing, leading to
job failures from checkpoint timeouts.
Solution:
Ensure mailboxExecutor.yield() is only called when in-flight requests are
present. Replacing it with the private function
[yieldIfThereExistsInFlightRequests()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L443]
provides a direct solution to this problem:
{code:java}
private void flush() throws InterruptedException {
RequestInfo requestInfo = createRequestInfo();
while (rateLimitingStrategy.shouldBlock(requestInfo)) {
yieldIfThereExistsInFlightRequests();
requestInfo = createRequestInfo();
}
...
} {code}
was:
[AsyncSinkWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java]
may hang when using a custom rate-limiting strategy that may block new
requests when no others are in progress. This issue occurs when implementing
rate limits, such as restricting API requests per interval.
Given the following RateLimitingStrategy implementation based on
[bucket4j|https://github.com/bucket4j/bucket4j]:
{code:java}
package pl.zabka.cdp.common.ratelimit
import com.typesafe.scalalogging.LazyLogging
import io.github.bucket4j.Bucket
import org.apache.flink.connector.base.sink.writer.strategy.{
RateLimitingStrategy,
RequestInfo,
ResultInfo
}
import java.io.Serializable
class TokenBucketRateLimitingStrategy(
maxInFlightRequests: Int,
tokensPerSecond: Long,
tokensPerMinute: Long
) extends RateLimitingStrategy
with LazyLogging
with Serializable {
@transient
private var currentInFlightRequests = 0
@transient
private lazy val bucket: Bucket = TokenBucketProvider.getInstance(
"TokenBucketRateLimitingStrategy",
tokensPerSecond,
tokensPerMinute
)
override def shouldBlock(requestInfo: RequestInfo): Boolean = {
currentInFlightRequests >= maxInFlightRequests ||
areTokensNotAvailable(requestInfo)
}
private def areTokensNotAvailable(requestInfo: RequestInfo): Boolean = {
val batchSize = requestInfo.getBatchSize
if (batchSize <= 0) {
logger.debug(s"Received request with invalid batch size: $batchSize,
allowing to proceed")
return false
}
!bucket.estimateAbilityToConsume(batchSize).canBeConsumed
}
...
} {code}
AsyncSinkWriter may hang on
[mailboxExecutor.yield()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L356]
when all tokens are already consumed and no requests are in flight:
{code:java}
private void flush() throws InterruptedException {
RequestInfo requestInfo = createRequestInfo();
while (rateLimitingStrategy.shouldBlock(requestInfo)) {
mailboxExecutor.yield();
requestInfo = createRequestInfo();
}
...
}{code}
Rate limiting activation causes Flink to hang during checkpointing, leading to
job failures from checkpoint timeouts.
Solution:
Ensure mailboxExecutor.yield() is only called when in-flight requests are
present. Replacing it with the private function
[yieldIfThereExistsInFlightRequests()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L443]
provides a direct solution to this problem:
{code:java}
private void flush() throws InterruptedException {
RequestInfo requestInfo = createRequestInfo();
while (rateLimitingStrategy.shouldBlock(requestInfo)) {
yieldIfThereExistsInFlightRequests();
requestInfo = createRequestInfo();
}
...
} {code}
> AsyncSinkWriter may hang when configured with a custom rate limiting strategy
> -----------------------------------------------------------------------------
>
> Key: FLINK-38096
> URL: https://issues.apache.org/jira/browse/FLINK-38096
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 2.0.0, 1.20.0, 1.20.1, 1.20.2
> Reporter: Michał Misiewicz
> Priority: Major
>
> [AsyncSinkWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java]
> may hang when using a custom rate-limiting strategy that may block new
> requests when no others are in progress. This issue occurs when implementing
> rate limits, such as restricting API requests per interval.
> Given the following RateLimitingStrategy implementation based on
> [bucket4j|https://github.com/bucket4j/bucket4j]:
> {code:java}
> package pl.zabka.cdp.common.ratelimit
> import com.typesafe.scalalogging.LazyLogging
> import io.github.bucket4j.Bucket
> import org.apache.flink.connector.base.sink.writer.strategy.{
> RateLimitingStrategy,
> RequestInfo,
> ResultInfo
> }
> import java.io.Serializable
> class TokenBucketRateLimitingStrategy(
> maxInFlightRequests: Int,
> tokensPerSecond: Long,
> tokensPerMinute: Long
> ) extends RateLimitingStrategy
> with LazyLogging
> with Serializable {
> @transient
> private var currentInFlightRequests = 0
> @transient
> private lazy val bucket: Bucket = TokenBucketProvider.getInstance(
> "TokenBucketRateLimitingStrategy",
> tokensPerSecond,
> tokensPerMinute
> )
> override def shouldBlock(requestInfo: RequestInfo): Boolean = {
> currentInFlightRequests >= maxInFlightRequests ||
> areTokensNotAvailable(requestInfo)
> }
> private def areTokensNotAvailable(requestInfo: RequestInfo): Boolean = {
> val batchSize = requestInfo.getBatchSize
> if (batchSize <= 0) {
> logger.debug(s"Received request with invalid batch size: $batchSize,
> allowing to proceed")
> return false
> }
> !bucket.estimateAbilityToConsume(batchSize).canBeConsumed
> }
> ...
> } {code}
>
> AsyncSinkWriter may hang on
> [mailboxExecutor.yield()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L356]
> when all tokens are already consumed and no requests are in flight:
> {code:java}
> private void flush() throws InterruptedException {
> RequestInfo requestInfo = createRequestInfo();
> while (rateLimitingStrategy.shouldBlock(requestInfo)) {
> mailboxExecutor.yield();
> requestInfo = createRequestInfo();
> }
> ...
> }{code}
>
> Rate limiting activation causes Flink to hang during checkpointing, leading
> to job failures from checkpoint timeouts.
> Solution:
> Ensure mailboxExecutor.yield() is only called when in-flight requests are
> present. Replacing it with the private function
> [yieldIfThereExistsInFlightRequests()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L443]
> provides a direct solution to this problem:
> {code:java}
> private void flush() throws InterruptedException {
> RequestInfo requestInfo = createRequestInfo();
> while (rateLimitingStrategy.shouldBlock(requestInfo)) {
> yieldIfThereExistsInFlightRequests();
> requestInfo = createRequestInfo();
> }
> ...
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)