Hi Henry and All,
Thanks for the KIP. I believe this is a feature that many people want to
have.
However, if the design target is set to 15 minutes, then, when considering
the complexity of the currently proposed solution, the cost–benefit ratio
may not be as compelling as we might hope.
That said, if our primary goal is simply to provide a delay feature with
some reasonable constraints, perhaps we could consider supporting a fixed
delay feature per topic — for instance, a topic configured with a 5-minute
delay or a 10-minute delay.
This could help avoid the need to handle multiple different delay durations
within a single topic. In that case, a fixed consumption delay could be
implemented on the client side by pausing polling, which would likely
require only limited additional code.
Anyway, I may have strayed slightly from the original focus of the
discussion, and I apologize for that.
Finally, thank you very much for raising such an interesting and
thought-provoking topic !
Regards
Jian
Andrew Schofield <[email protected]> 于2026年2月1日周日 02:22写道:
Hi Henry and others,
Thanks for the KIP. I see there’s already been lively discussion.
My initial thought is that the first version of the KIP doesn’t work with
a lot of existing Kafka features and breaks a lot of assumptions. For
example, the offset order of consumed records is no longer monotonically
increasing, there’s no support for EOS because there’s no longer a
specific
offset that cleanly delineates completed transactions from open
transactions, and so on.
I think that one of the principles of a successful KIP of this kind is
that it embraces all of the Kafka features which went before. This is why
KIP-932 supports isolation level and tiered storage. There’s also one
other
relevant detail of KIP-932 which is that the broker never looks within
the
record batches. We don’t want to decompress compressed batches on the
broker, so we do not iterate over the records. I worry about having a
per-record delay and building an index accordingly.
You mention using delayed message support as a way of introducing delay
into message retries with share groups. I think that could be more
readily
achieved in the existing share-partition code.
Chia-Ping’s suggestion for essentially considering this as an enhancement
for share groups is interesting. Regular consumers would be unaware of
the
delay and they would still get records in offset order. Share consumers
are
not guaranteed to get records in offset order so I think we have more
flexibility there. But I see a couple of problems with this too. First,
if
delayed records are acquired by share consumers when they are not yet
intended to be delivered, they will be eating into the in-flight record
limit for the partition. If there are lots of delayed records ahead of
records with no delay, they could temporarily prevent delivery of the
undelayed records. Second, I don’t believe restricting this feature to
share groups is quite what was intended. I think you were aiming for a
general-purpose feature for delayed messages in Kafka.
Max suggested a scheduler topic. That possibly could work, but it gets
tricky to support features like transactions. What if an application
puts a
record with a long delay and a short delay in the same transaction? Are
control records written onto the scheduler topic? How do we atomically
move
records from the scheduler topic onto the user’s topic, because I would
say
that the user’s expected quality-of-service (EOS, idempotent produce)
would
have to be honoured by the copying process? I think that a per-producer
delay is better with this design, because all records put in a
transaction
would have consistent delays and thus could be grouped together. This
would
be an interesting problem to solve. There’s also the fact that the
leadership of the user’s topic-partitions is not going to be the same as
the leadership of the scheduler topic-partitions. There would be
inter-broker calls, just as there are for DLQs for share groups.
Hopefully the comments spark some ideas. This is quite a tough problem to
solve neatly in Kafka.
Thanks,
Andrew
On 2026/01/31 07:18:05 Chia-Ping Tsai wrote:
Hi Henry and everyone,
Thanks for the KIP! Delayed messaging is indeed a highly requested
feature.
I've been following the development of **KIP-932 (Share Groups)** and
the recent **KIP-1222 (Acquisition lock timeout renewal)**, and I’m
wondering if we can achieve the same goal by leveraging Share Consumers
on
the client side, rather than modifying the Broker's storage engine.
The main concern with KIP-1277 is the complexity introduced to the
Broker (new indexes, modifying LogSegment logic) and the potential
performance penalty of breaking Zero-Copy (since the broker needs to
unpack
batches to filter out delayed messages).
**Alternative Proposal: Client-Side Delay with Share Groups**
With KIP-932 and KIP-1222, we might have enough primitives to build a
robust **Client-Side Delay** mechanism that keeps the Broker stateless
and
efficient:
1. **Protocol**: Producers simply add a `__kafka_delay_timestamp`
header. The Broker treats it as a normal message and delivers it via
Zero-Copy fetch.
2. **Client-Side Filtering**: The Share Consumer SDK parses the header.
* If `now >= timestamp`: Process immediately.
* If `now < timestamp`: **Do not** return to the user application.
3. **The "Hold & Renew" Strategy**:
* Instead of `RELEASE`-ing the message back to the broker (which causes
busy loops), the Consumer buffers the record internally.
* Crucially, it uses the **`RENEW` action (from KIP-1222)** to maintain
the acquisition lock without processing the message.
* This effectively "hides" the message from other consumers while
preventing redelivery.
4. **Graceful Degradation (Memory Protection)**:
* To address the concern of buffering 1M+ delayed messages causing OOM:
* The Consumer can implement an eviction policy: Drop the **payload**
but keep the **Offset + Timestamp** in a lightweight PriorityQueue.
* The Consumer continues to `RENEW` the lock for these offsets
(extremely low overhead).
* When the time comes, if the payload was dropped, issue a `RELEASE` to
re-fetch the data from the broker.
**Trade-offs:**
* **Pros:**
* **Zero Broker Changes:** No new index files, no changes to log
segments.
* **Performance:** Preserves Zero-Copy for fetches. The Broker only
handles lightweight metadata (Renew RPCs).
* **Flexibility:** Logic resides in the Client SDK.
* **Cons:**
* **Traffic:** Consumers download delayed data ahead of time. (However,
for the "short time window" use case mentioned in KIP-1277, this seems
acceptable).
* **Head-of-Line Blocking:** If the *entire* batch is delayed messages,
it occupies the "In-flight" limit. However, KIP-1277 also targets "short
delays," so the impact might be manageable or solved via "Parking Lot"
topics for long delays.
Given that KIP-1277 also focuses on **short delays (up to 15 mins)**,
the Share Group approach seems to cover the use case well without the
heavy
lift on the storage engine.
Would love to hear your thoughts on whether this direction has been
considered.
Best,
Chia-Ping
On 2026/01/30 07:34:09 Henry Haiying Cai via dev wrote:
Hello Kafka Developers,
I would like to start discussing KIP-1277: Support Delayed Message in
Kafka.
A common queue scheduling feature is delayed messages where the
message is not supposed to be delivered or consumed right away. The use
case is a large influx of messages or other activities happening in the
system at the moment, the message producer wants to make sure the
messages
are being consumed/processed in a little bit later time or the message
consumption is being spread over a period of time. Another common use is
message retry handling (e.g. retries in the new Kafka Queue feature),
when
a message consumer/worker cannot process the message due to some
transient
failures in external systems, usually the worker wants to unacknowledge
the
message and retry it later. The retry is usually more ideal to be
scheduled at a later time usually with some exponential backoff time
interval.
Since Kafka was lacking support for message scheduling or delayed
message delivery, users have turned to other queuing systems for these
features. For example, users have been using AWS SQS delayed message /
delayed topic to deliver messages later within a short time frame (e.g.
within 15 minutes) and use DynamoDB or traditional database tables for
the
delayed message for longer delayed duration.
We are proposing to implement delayed messages for Kafka to fill in
this feature gap. Similar to SQS, the proposal is also focusing on
messages
delayed delivery for a short time window (up to 15 minutes late). In our
use cases, most of the message late delivery is within 1 minute.
KIP-1277 can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1277%3A+Support+Delayed+Message+in+Kafka
Looking forward to suggestions and feedback :)
Best,
Henry Cai and Tom Thornton