Hi Andrew,
in general it's an expected case that the broker returns more data than the
limits, as it returns at least one MessageSet. All messages that can be
parsed are enqueued. If there's a buffer underflow because of the
truncation by the broker the parsing stops at last complete message. If the
internal librdkafka consumer queue contains at least
*queued.min.messages* (100K
by default) or *queued.max.messages.kbytes* (64 KiB) the fetches stop until
they're less than that.

About  *queued.max.messages.kbytes*  we say that *This value may be
overshot by fetch.message.max.bytes *(alias of *max.partition.fetch.bytes*) we
could update the documentation to add that it can be overshot by
*remote.max.partition.fetch.bytes
*in case it's set.

Of course testing is needed but I don't see problems in theory.

Thanks Kamal and Andrew for the KIP and its review.
Emanuele

On Tue, May 13, 2025 at 7:48 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Kamal,
> Thanks for your response.
>
> I know a reasonable amount about the Apache Kafka consumer and protocol.
> In general, the limits
> are applied as soft limits. Essentially, read some data, check whether the
> limit has been reached,
> and repeat until the limit has been reached. It is very common to go
> slightly over the limit as a
> result, typically by up to one record batch. As a result, the consumer
> doesn't pay any
> attention to the amount of data returned to it. It does pay attention to
> the number of records
> returned in the way that it batches them to deliver to the application,
> but that's it.
>
> By specifying max.fetch.bytes of 50MB, the consumer might receive 50MB of
> data
> in response to the fetch. So, I think there is essentially "permission"
> from the consumer
> to return up to 50MB. If efficiency demands that you fetch at least 4MB
> from remote
> storage, keeping well within max.fetch.bytes seems fine to me. I can
> imagine that
> a broker configuration would be helpful in case the characteristics of a
> particular
> storage provider needed to be accommodated.
>
> I wonder if Emanuele Sabellico can add any information about how librdkafka
> manages the limits in FetchRequest and how sensitive it is to more data
> being
> returned for a partition than expected.
>
> Thanks,
> Andrew
>
> ________________________________________
> From: Kamal Chandraprakash <kamal.chandraprak...@gmail.com>
> Sent: 08 May 2025 19:24
> To: dev@kafka.apache.org <dev@kafka.apache.org>
> Subject: Re: [DISCUSS] KIP-1178: Introduce
> remote.max.partition.fetch.bytes in Consumer
>
> Hi Andrew,
>
> Thanks for the review!
>
> The initial idea was to introduce the configuration on the broker side,
> similar to how remote.fetch.max.wait.ms complements fetch.max.wait.ms:
>
>    - fetch.max.wait.ms — configured via ConsumerConfig
>    <
> https://sourcegraph.com/github.com/apache/kafka/-/blob/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java?L205
> >
>    - remote.fetch.max.wait.ms — configured via RemoteLogManagerConfig
>    <
> https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java?L185
> >
>     (broker-side)
>
> With KIP-74
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
> >,
> the max.partition.fetch.bytes
> <
> https://sourcegraph.com/github.com/apache/kafka/-/blob/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java?L218-224
> >
> is
> treated as a soft limit. This means the actual size of the FETCH response
> returned to the client may exceed this value, particularly when a single
> RecordBatch is larger than the configured limit.
>
> One area that remains unclear is how third-party clients behave when they
> are configured with the default values:
>
>    - max.partition.fetch.bytes = 1 MB and
>    - max.fetch.bytes = 50 MB
>
> If the broker responds with a 4 MB partition response containing multiple
> RecordBatches, does the client fail to process the records
> due to exceeding max.partition.fetch.bytes, or does it handle the larger
> response gracefully?
>
> Thanks,
> Kamal
>
> On Thu, May 8, 2025 at 7:19 PM Andrew Schofield <
> andrew_schofield_j...@outlook.com> wrote:
>
> > Hi Kamal,
> > Thanks for the KIP.
> >
> > While it makes a lot of sense to me to be able to control the fetching
> > from remote
> > storage to make sure it's sympathetic to the characteristics of the
> > storage provider,
> > it seems to me that extending this concept all the way to the individual
> > consumers
> > is not a good idea. You might have different consumers specifying their
> own
> > wildly different values, when really you want a consistent configuration
> > which
> > applies whenever data is fetched from remote storage. Could a broker or
> > topic
> > config be used to achieve this more effectively? It worries me whenever
> we
> > have
> > a configuration which would ideally be used by all consumers setting the
> > same
> > value. It suggests that they shouldn't be able to provide their own
> values
> > at all.
> >
> > Thanks,
> > Andrew
> > ________________________________________
> > From: Kamal Chandraprakash <kamal.chandraprak...@gmail.com>
> > Sent: 08 May 2025 12:07
> > To: dev@kafka.apache.org <dev@kafka.apache.org>
> > Subject: [DISCUSS] KIP-1178: Introduce remote.max.partition.fetch.bytes
> in
> > Consumer
> >
> > Hi all,
> >
> > I've opened the KIP-1178 to add a new config
> > 'remote.max.partition.fetch.bytes' in the consumer. This config allows it
> > to read from remote storage faster.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1178%3A+Introduce+remote.max.partition.fetch.bytes+config+in+Consumer
> >
> > Please take a look and suggest your thoughts.
> >
> > Thanks,
> > Kamal
> >
>

Reply via email to