[ https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17809592#comment-17809592 ]
Jorge Esteban Quilcate Otoya commented on KAFKA-15776: ------------------------------------------------------ Agree with [~fvisconte] that tweaking an existing config on the consumer side it's undesired given that Tiered Storage aims to be transparent to clients. An additional issue even when caching fetch requests is that remote fetch doesn't only fetch the log segment but potentially also the offset index. Considering that RemoteIndexCache is a sync cache, interrupted fetches are not cached and potentially block consumer's progress. This has [pushed us|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/pull/472] to build an additional async cache for indexes as a workaround. Some additional thoughts on how to approach this issue: On not interrupting the thread: This would help on removing the flooding exceptions; but will have the effect of pilling up threads by potentially having more than one thread per consumer fetching a partition caused by retries, potentially running out of threads on the reader thread pool (default size = 10), causing other issues. By the way, I can see that delayed remote fetch operation has a fixed purge interval of 1000 without config. Should we have a config for this one? Or because there is a thread pool size, there there is no need to have this configuration? On the timeout configuration semantics: Based on [https://github.com/apache/kafka/pull/14778#issuecomment-1820588080] We should update and make the expectations about `fetch.max.wait.ms` explicit on our docs, that it should only apply to data available on local log and if topics are tiered, then larger latencies may apply. We could also consider adding a new exception type for interrupted remote fetch operations; this way we can use it on RLM to consider the proper logging level. We would need to document RSM interface and request implementations to report interrupted exceptions properly. On the remote fetch timeout configuration: An additional configuration seems certainly needed to redefine this delayed operation. But only having a different configuration with a larger default value would help to a certain point. Instead of a fixed timeout config, we could consider having a backoff configs to set the boundaries on when to start interrupting remote fetches while bumping (e.g. +100ms) these timeouts until an upper bound where failures will start to be reported to consumers. This way operators can have better configurations to tune. e.g. something around a lower bound of 2 seconds to start interrupting remote fetches and 10sec to start failing consumer requests. Having both, these configs and a new exception type, can enable to have a proper handling of these exceptions by reporting them as e.g. WARN/DEBUG level when bellow max timeout, and fail consumer requests and log as WARN/ERROR when hitting upper bound. Also, as degradation happens between broker and remote storage, this configuration should not be a consumer one – as consumers can't have all the context on how to update these values. Instead, these configurations can be on the broker side for operators to set them. cc [~showuon] [~satishd] > Update delay timeout for DelayedRemoteFetch request > --------------------------------------------------- > > Key: KAFKA-15776 > URL: https://issues.apache.org/jira/browse/KAFKA-15776 > Project: Kafka > Issue Type: Task > Reporter: Kamal Chandraprakash > Assignee: Kamal Chandraprakash > Priority: Major > > We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for > DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the > given amount of time when there is no data available to serve the FETCH > request. > {code:java} > The maximum amount of time the server will block before answering the fetch > request if there isn't sufficient data to immediately satisfy the requirement > given by fetch.min.bytes. > {code} > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41] > Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the > user on how to configure optimal value for each purpose. Moreover, the config > is of *LOW* importance and most of the users won't configure it and use the > default value of 500 ms. > Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to > higher number of expired delayed remote fetch requests when the remote > storage have any degradation. > We should introduce one {{fetch.remote.max.wait.ms}} config (preferably > server config) to define the delay timeout for DelayedRemoteFetch requests > (or) take it from client similar to {{request.timeout.ms}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)