lhotari commented on code in PR #24625: URL: https://github.com/apache/pulsar/pull/24625#discussion_r2281451926
########## pip/pip-437.md: ########## @@ -0,0 +1,134 @@ +# PIP-437: Granular and Fixed-Delay Policies for Message Delivery + +# Background +Pulsar's delayed delivery feature allows producers to schedule messages to be delivered at a future time. To provide administrative control and prevent abuse, **PIP-315** introduced a `maxDeliveryDelayInMillis` policy. +While this provides an important safeguard by setting an *upper bound*, it does not address all administrative needs. There is currently no way to enforce a *specific, non-overridable* delay for all messages on a topic. + +# Motivation +The current system lacks administrative controls on message delivery delays, which introduces risks to cluster stability and data integrity. + +## Risks in the current system: +Unbounded Resource Pressure: Excessively delayed messages must be retained in storage for extended periods. +More importantly, they create a long-lived state in the broker's memory that cannot be cleared, +leading to unpredictable resource consumption and potential garbage collection pressure + +### The Limitation: +To protect broker memory, this tracker is capped at a configurable fixed number of entries (e.g., 10,000). Review Comment: As background, it would be useful to reference the configuration settings behind this limit. There are some details in discussion https://github.com/apache/pulsar/discussions/23990. ########## pip/pip-437.md: ########## @@ -0,0 +1,134 @@ +# PIP-437: Granular and Fixed-Delay Policies for Message Delivery + +# Background +Pulsar's delayed delivery feature allows producers to schedule messages to be delivered at a future time. To provide administrative control and prevent abuse, **PIP-315** introduced a `maxDeliveryDelayInMillis` policy. +While this provides an important safeguard by setting an *upper bound*, it does not address all administrative needs. There is currently no way to enforce a *specific, non-overridable* delay for all messages on a topic. + +# Motivation +The current system lacks administrative controls on message delivery delays, which introduces risks to cluster stability and data integrity. + +## Risks in the current system: +Unbounded Resource Pressure: Excessively delayed messages must be retained in storage for extended periods. +More importantly, they create a long-lived state in the broker's memory that cannot be cleared, +leading to unpredictable resource consumption and potential garbage collection pressure + +### The Limitation: +To protect broker memory, this tracker is capped at a configurable fixed number of entries (e.g., 10,000). +A large volume of messages with widely varying delivery times can easily exhaust this capacity. +Once the tracker is full, it stops persisting the index for new delayed messages, retaining this state only in memory. +If the broker restarts for any reason (e.g., crash, rolling upgrade), this volatile in-memory state is completely lost. + +### The Impact: Upon restart, the broker's only source of truth is the last persisted cursor position. +This position does not account for the lost tracking information, forcing the broker to re-dispatch messages that were already processed. +This results in significant and difficult-to-predict message duplication for downstream consumers. + +The lack of administrative controls on message delivery delays introduces critical risks to cluster stability and data integrity. +This proposal aims to provide granular control at the topic and namespace levels to add a new fixed delay delivery configuration: + +1. **Prevent Message Duplication:** By forcing all messages on a topic to have the exact same delay,it prevents the delayed message tracker from becoming overwhelmed and eliminates the risk of message duplication upon broker restart, leading to a more stable and predictable system. +2. **Enforcing Compliance and Business Rules:** Certain workflows may require certain control that require the enforcement at the broker level +3. **Simplifying Producer Configuration:** By setting a fixed delay on the topic, the responsibility for managing the delay logic is shifted from the client to the broker, reducing the risk of client-side misconfiguration. + +# Goals +## In Scope +- Introduce a new `fixed-delivery-delay` policy, configurable at the namespace and topic levels, to enforce a mandatory delivery delay. +- Enhance the existing `/delayedDelivery` admin API endpoints and `pulsar-admin` commands to manage both the existing `maxDeliveryDelayInMillis` and the new `fixedDeliveryDelayInMillis` policies within the same policy group. +- Ensure the `fixed-delivery-delay` policy takes precedence over any client-specified delay and the existing `max-delivery-delay` policy. + +# High Level Design +This proposal will enhance the existing `DelayedDeliveryPolicies` object to include a new `fixedDeliveryDelayInMillis` field. +This new policy will follow Pulsar's standard hierarchical model, allowing it to be set at the namespace level and overridden at the topic level. +The core logic will be implemented with the following precedence: + +1. **If `fixed-delivery-delay` is set:** The broker will ignore any `deliverAt` time sent by the producer and will override it by calculating `publish_time + fixed_delay`. The `max-delivery-delay` policy is ignored. +2. **If `fixed-delivery-delay` is NOT set, but `max-delivery-delay` is:** The broker will validate the producer's requested `deliverAt` time against the `max-delivery-delay` policy, rejecting the message if it exceeds the limit. +3. **If neither policy is set:** The system behaves as it does today, honoring the client's requested `deliverAt` time, constrained only by the global broker-level setting. + +# Detailed Design +## Design & Implementation Details + +1. **Data Model Changes**: + * Add a new `fixedDeliveryDelayInMillis` field to the `DelayedDeliveryPolicies.java` class. This class is already used for the existing `maxDeliveryDelayInMillis` policy. + * Update `HierarchyTopicPolicies.java` to resolve the effective `fixedDeliveryDelayInMillis` for a topic, respecting the topic-over-namespace hierarchy. + +2. **Enforcement Logic**: + * The enforcement logic will be placed in `PersistentTopic.java` within the `publishMessage` and `publishTxnMessage` methods, right before the existing `isExceedMaximumDeliveryDelay` check. + * This logic will modify the `MessageMetadata` of the incoming message *before* it is passed to the managed ledger for persistence. + +## Public-facing Changes +### Public API +#### Topic-Level Policies +| Method | Endpoint | Description | +| :------- | :------------------------------------------------------------------------ |:-----------------------------------------------------------------------------------------------------------------------------------------| +| `POST` | `/admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery` | Sets or updates the delayed delivery policies for the topic.To disable a policy, a field can be set to `0`. | +| `GET` | `/admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery` | Gets the configured delayed delivery policies for the topic which optionally include the fixedDeliveryDelayInMillis if configured | + +#### Namespace-Level Policies +| Method | Endpoint | Description | +| :------- | :---------------------------------------------------------------- |:-----------------------------------------------------------------| +| `POST` | `/admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery` | Sets or updates the delayed delivery policies for the namespace. To disable a policy, a field can be set to `0`.| +| `GET` | `/admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery` | Gets the configured delayed delivery policies for the namespace which optionally include the fixedDeliveryDelayInMillis if configured | + +### Binary protocol + +### Configuration + +### CLI +* The existing `set-delayed-delivery` command in `CmdTopicPolicies.java` and `CmdNamespaces.java` will be updated with a new optional parameter: `--fixed-delay` (or `-fd`). +* The REST endpoints under `/delayedDelivery` will be updated to accept and return the new `fixedDeliveryDelayInMillis` field in their JSON payload. + +### Metrics + +To provide visibility into the enforcement of the new policies, we will introduce a new counter metric. +* **Full Name**: `pulsar.broker.topic.messages.delayed.rejected` Review Comment: It would be useful to mention that this metric is for `maxDeliveryDelayInMillis ` added PIP-315. ########## pip/pip-437.md: ########## @@ -0,0 +1,134 @@ +# PIP-437: Granular and Fixed-Delay Policies for Message Delivery + +# Background +Pulsar's delayed delivery feature allows producers to schedule messages to be delivered at a future time. To provide administrative control and prevent abuse, **PIP-315** introduced a `maxDeliveryDelayInMillis` policy. +While this provides an important safeguard by setting an *upper bound*, it does not address all administrative needs. There is currently no way to enforce a *specific, non-overridable* delay for all messages on a topic. + +# Motivation +The current system lacks administrative controls on message delivery delays, which introduces risks to cluster stability and data integrity. + +## Risks in the current system: +Unbounded Resource Pressure: Excessively delayed messages must be retained in storage for extended periods. +More importantly, they create a long-lived state in the broker's memory that cannot be cleared, +leading to unpredictable resource consumption and potential garbage collection pressure + +### The Limitation: +To protect broker memory, this tracker is capped at a configurable fixed number of entries (e.g., 10,000). +A large volume of messages with widely varying delivery times can easily exhaust this capacity. +Once the tracker is full, it stops persisting the index for new delayed messages, retaining this state only in memory. Review Comment: Which tracker is this referring to? The `InMemoryDelayedDeliveryTracker` or `BucketDelayedDeliveryTracker` itself doesn't have a limit. In the case of `InMemoryDelayedDeliveryTracker`, it will always discard the in-memory state when all consumers have disconnected and when the first consumer arrives while the consumer list is empty. `tracker.clear()` is called on line 198: https://github.com/apache/pulsar/blob/f2618c15bb5a9f3fcb577068584df5d0e2e4f335/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L179-L201 The way `InMemoryDelayedDeliveryTracker` behaves is that it will always read all messages from the "mark delete position" (since `cursor.rewind()` moves the "read position" to "mark delete position") in this case. I assume that the limit that you are referring to is the `managedLedgerMaxUnackedRangesToPersist` limit (also referred to in https://github.com/apache/pulsar/discussions/23990 discussion). ########## pip/pip-437.md: ########## @@ -0,0 +1,134 @@ +# PIP-437: Granular and Fixed-Delay Policies for Message Delivery + +# Background +Pulsar's delayed delivery feature allows producers to schedule messages to be delivered at a future time. To provide administrative control and prevent abuse, **PIP-315** introduced a `maxDeliveryDelayInMillis` policy. +While this provides an important safeguard by setting an *upper bound*, it does not address all administrative needs. There is currently no way to enforce a *specific, non-overridable* delay for all messages on a topic. + +# Motivation +The current system lacks administrative controls on message delivery delays, which introduces risks to cluster stability and data integrity. + +## Risks in the current system: +Unbounded Resource Pressure: Excessively delayed messages must be retained in storage for extended periods. +More importantly, they create a long-lived state in the broker's memory that cannot be cleared, +leading to unpredictable resource consumption and potential garbage collection pressure + +### The Limitation: +To protect broker memory, this tracker is capped at a configurable fixed number of entries (e.g., 10,000). +A large volume of messages with widely varying delivery times can easily exhaust this capacity. +Once the tracker is full, it stops persisting the index for new delayed messages, retaining this state only in memory. +If the broker restarts for any reason (e.g., crash, rolling upgrade), this volatile in-memory state is completely lost. + +### The Impact: Upon restart, the broker's only source of truth is the last persisted cursor position. +This position does not account for the lost tracking information, forcing the broker to re-dispatch messages that were already processed. Review Comment: Please be more specific here about what is the "lost tracking information" (similar to previous comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
