lhotari commented on code in PR #24625:
URL: https://github.com/apache/pulsar/pull/24625#discussion_r2275625104
##########
pip/pip-437.md:
##########
@@ -0,0 +1,167 @@
+# PIP-437: Granular Control for Message Delivery Delays
+
+# Background knowledge
+Pulsar currently allows producers to specify deliverAt timestamps for delayed
message delivery.
+However, there's no mechanism to constrain how far into the future a message
can be scheduled. With each producer
+scheduling various delivery time it makes the system less predictable and
bring challenges for support.
+
+# Motivation
+The current system lacks administrative controls on message delivery delays,
which introduces risks to cluster stability and data integrity.
+
+## Risks in the current system:
+Unbounded Resource Pressure: Excessively delayed messages must be retained in
storage for extended periods.
+More importantly, they create a long-lived state in the broker's memory that
cannot be cleared,
+leading to unpredictable resource consumption and potential garbage collection
pressure
+
+### The Limitation:
+To protect broker memory, this tracker is capped at a configurable fixed
number of entries (e.g., 10,000).
+A large volume of messages with widely varying delivery times can easily
exhaust this capacity.
+Once the tracker is full, it stops persisting the index for new delayed
messages, retaining this state only in memory.
+If the broker restarts for any reason (e.g., crash, rolling upgrade), this
volatile in-memory state is completely lost.
+
+### The Impact: Upon restart, the broker's only source of truth is the last
persisted cursor position.
+This position does not account for the lost tracking information, forcing the
broker to re-dispatch messages that were already processed.
+This results in significant and difficult-to-predict message duplication for
downstream consumers.
+
+The lack of administrative controls on message delivery delays introduces
critical risks to cluster stability and data integrity.
+This proposal aims to provide granular control at the topic and namespace
levels to provide fixed delay delivery configuration and to add a maximum delay
delivery restriction.
+
+# Goals
+## In Scope
+- Add a new config that limits the maximum allowed delivery delay for a topic
+- Add a fixed delivery delay config for a topic
+- Add a new config that limits the maximum allowed delivery delay for a
namespace
+- Add a fixed delivery delay config for a namespace
+- Provide safe defaults and observability.
+
+# High Level Design
+- Add configurations `maxDelayedDeliveryTimeMs` `fixedDelayedDeliveryTimeMs`
for Topic-level and Namespace-level configurations
+- A three-tiered policy hierarchy (Topic > Namespace > Broker) for setting the
maximum allowed delivery delay
+- A new, overriding policy (Topic > Namespace) for enforcing a fixed delivery
delay
+- If both fixed-delivery-delay and max-delivery-delay policies are configured,
the fixed-delivery-delay will always take precedence, overriding any
client-side settings and ignoring the max-delivery-delay policy.
+- Default Behavior: If no policies are configured, the broker will honor the
client's requested deliverAt time, constrained only by the broker-wide
delayedDeliveryMaxDelayInMillis setting. If a policy is set at the namespace
level but not the topic level, the topic will inherit the namespace's policy.
+
+# Detailed Design
+## Design & Implementation Details
+In the broker dispatch path (Producer.sendMessage), reject the messages that
are not satisfied
+if a fixed value was set for future delivery, then ignore the client's
configuration
+```
+if (messageMetadata.hasDeliverAtTime()) {
+long delay = messageMetadata.getDeliverAtTime() - System.currentTimeMillis();
+long maxDelay = getTopicMaxDeliverAtTime(topic);
+ if (delay > maxDelay) {
+ // Reject the message
+ ctx.writeAndFlush(new CommandSendError(...));
+ return;
+ }
+}
+```
+1. **Data Model Changes**:
+ * Add `maxDelayedDeliveryTimeInSeconds` and
`fixedDelayedDeliveryTimeInSeconds` fields to `TopicPolicies.java` and
`Policies.java` to store the configuration values at the topic and namespace
levels, respectively.
+ * Update `HierarchyTopicPolicies.java` to include the new fields in the
effective policy resolution.
+2. **Enforcement Logic**:
+ * Modify the `publishMessage` method in `Producer.java` to:
+ * Enforce the `maxDelayedDeliveryTimeInSeconds` policy by rejecting
messages with a delivery delay exceeding the configured maximum.
+ * Override the message's `deliverAtTime` with the
`fixedDelayedDeliveryTimeInSeconds` if the fixed delay policy is enabled.
+3. **Admin API**:
+ * Add new subcommands to `CmdTopicPolicies.java` and
`CmdNamespaces.java` to manage the new policies through the admin CLI.
+ * Implement new REST API endpoints for setting, getting, and removing
the policies.
+
+## Public-facing Changes
+
+### Public API
+#### Topic-Level
+| Method | Endpoint
| Description
|
+| :------- |
:---------------------------------------------------------------------------- |
:-----------------------------------------------------------------------------------
|
+| `PUT` |
`/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/maxDelayedDeliveryTime` |
Sets the maximum allowed delivery delay for a topic.
|
+| `PUT` |
`/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/fixedDelayedDeliveryTime`
| Sets the fixed delivery delay for a topic.
|
+| `GET` |
`/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/maxDelayedDeliveryTime` |
Gets the maximum allowed delivery delay for a topic.
|
+| `GET` |
`/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/fixedDelayedDeliveryTime`
| Gets the fixed delivery delay for a topic.
|
+| `DELETE` |
`/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/maxDelayedDeliveryTime` |
Removes the maximum allowed delivery delay for a topic, reverting to namespace
default. |
+| `DELETE` |
`/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/fixedDelayedDeliveryTime`
| Removes the fixed delivery delay for a topic, reverting to the namespace
default. |
+
+#### Namespace-Level
+| Method | Endpoint
| Description
|
+| :------- |
:-------------------------------------------------------------------- |
:---------------------------------------------------------------------------------------
|
+| `PUT` |
`/admin/v2/namespaces/{tenant}/{namespace}/maxDelayedDeliveryTime` | Sets
the maximum allowed delivery delay for a namespace.
|
+| `PUT` |
`/admin/v2/namespaces/{tenant}/{namespace}/fixedDelayedDeliveryTime` | Sets
the fixed delivery delay for a namespace.
|
+| `GET` |
`/admin/v2/namespaces/{tenant}/{namespace}/maxDelayedDeliveryTime` | Gets
the maximum allowed delivery delay for a namespace.
|
+| `GET` |
`/admin/v2/namespaces/{tenant}/{namespace}/fixedDelayedDeliveryTime` | Gets
the fixed delivery delay for a namespace.
|
+| `DELETE` |
`/admin/v2/namespaces/{tenant}/{namespace}/maxDelayedDeliveryTime` | Removes
the maximum allowed delivery delay for a namespace, reverting to the broker
default. |
+| `DELETE` |
`/admin/v2/namespaces/{tenant}/{namespace}/fixedDelayedDeliveryTime` |
Removes the fixed delivery delay for a namespace.
|
+### Binary protocol
+
+### Configuration
+
+### CLI
+New subcommands added to `pulsar-admin topics`:
+* `set-max-delivery-delay`
+* `get-max-delivery-delay`
+* `remove-max-delivery-delay`
+* `set-fixed-delivery-delay`
+* `get-fixed-delivery-delay`
+* `remove-fixed-delivery-delay`
+
+New subcommands added to `pulsar-admin namespace`:
+* `set-max-delivery-delay`
+* `get-max-delivery-delay`
+* `remove-max-delivery-delay`
+* `set-fixed-delivery-delay`
+* `get-fixed-delivery-delay`
+* `remove-fixed-delivery-delay`
Review Comment:
There's a related "PIP-315: Configurable max delay limit for delayed
delivery", https://github.com/apache/pulsar/blob/master/pip/pip-315.md,
implemented by https://github.com/apache/pulsar/pull/21798 . It would be great
that the new proposal would follow a similar solution for the API.
##########
pip/pip-437.md:
##########
@@ -0,0 +1,167 @@
+# PIP-437: Granular Control for Message Delivery Delays
+
+# Background knowledge
+Pulsar currently allows producers to specify deliverAt timestamps for delayed
message delivery.
+However, there's no mechanism to constrain how far into the future a message
can be scheduled. With each producer
+scheduling various delivery time it makes the system less predictable and
bring challenges for support.
+
+# Motivation
+The current system lacks administrative controls on message delivery delays,
which introduces risks to cluster stability and data integrity.
+
+## Risks in the current system:
+Unbounded Resource Pressure: Excessively delayed messages must be retained in
storage for extended periods.
+More importantly, they create a long-lived state in the broker's memory that
cannot be cleared,
+leading to unpredictable resource consumption and potential garbage collection
pressure
+
+### The Limitation:
+To protect broker memory, this tracker is capped at a configurable fixed
number of entries (e.g., 10,000).
+A large volume of messages with widely varying delivery times can easily
exhaust this capacity.
+Once the tracker is full, it stops persisting the index for new delayed
messages, retaining this state only in memory.
+If the broker restarts for any reason (e.g., crash, rolling upgrade), this
volatile in-memory state is completely lost.
+
+### The Impact: Upon restart, the broker's only source of truth is the last
persisted cursor position.
+This position does not account for the lost tracking information, forcing the
broker to re-dispatch messages that were already processed.
+This results in significant and difficult-to-predict message duplication for
downstream consumers.
+
+The lack of administrative controls on message delivery delays introduces
critical risks to cluster stability and data integrity.
+This proposal aims to provide granular control at the topic and namespace
levels to provide fixed delay delivery configuration and to add a maximum delay
delivery restriction.
+
+# Goals
+## In Scope
+- Add a new config that limits the maximum allowed delivery delay for a topic
+- Add a fixed delivery delay config for a topic
+- Add a new config that limits the maximum allowed delivery delay for a
namespace
Review Comment:
I think that the maximum allowed delivery delay is already covered by
["PIP-315: Configurable max delay limit for delayed
delivery"](https://github.com/apache/pulsar/blob/master/pip/pip-315.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]