This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7547fab3b44 [improve][pip] PIP-437: Granular and Fixed-Delay Policies 
for Message Delivery (#24625)
7547fab3b44 is described below

commit 7547fab3b441d9062fa6f67a3a3d50778c37e15e
Author: Christina Wang <[email protected]>
AuthorDate: Tue Sep 9 14:32:20 2025 -0700

    [improve][pip] PIP-437: Granular and Fixed-Delay Policies for Message 
Delivery (#24625)
    
    Co-authored-by: Christina <[email protected]>
---
 pip/pip-437.md | 142 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 142 insertions(+)

diff --git a/pip/pip-437.md b/pip/pip-437.md
new file mode 100644
index 00000000000..ae85360ccd6
--- /dev/null
+++ b/pip/pip-437.md
@@ -0,0 +1,142 @@
+# PIP-437: Granular and Fixed-Delay Policies for Message Delivery
+
+# Background
+Pulsar's delayed delivery feature allows producers to schedule messages to be 
delivered at a future time. To provide administrative control and prevent 
abuse, **PIP-315** introduced a `maxDeliveryDelayInMillis` policy.
+While this provides an important safeguard by setting an *upper bound*, it 
does not address all administrative needs. There is currently no way to enforce 
a *specific, non-overridable* delay for all messages on a topic.
+
+# Motivation
+
+### The Limitation:
+Pulsar's delayed message delivery relies on tracking individual messages until 
their delivery time. This tracking state is persisted as part of the 
subscription's cursor metadata,
+which records gaps or "holes" for unacknowledged messages. The Pulsar Managed 
Ledger has a hard limit on the number of disjoint unacknowledged ranges it can 
persist for a cursor, 
+configured by `managedLedgerMaxUnackedRangesToPersist` (defaulting to 10,000).
+A large volume of messages with widely varying delivery times can easily 
exhaust this capacity with each delayed message creating a separate 
unacknowledged hole. 
+
+### The Impact: Upon restart, the broker's only source of truth is the last 
persisted cursor position.
+Once the `managedLedgerMaxUnackedRangesToPersist` limit is breached, the 
broker stops persisting the cursor's state and maintains it only in memory. 
This in-memory state is volatile and is completely lost if the broker restarts 
for any reason.
+Upon restart, the broker's only source of truth is the last successfully 
persisted cursor position. This position does not account for the lost tracking 
information, forcing the broker to re-dispatch all messages that were being 
tracked in memory.
+This results in significant and difficult-to-predict message duplication for 
downstream consumers.
+
+The lack of administrative controls on message delivery delays introduces 
critical risks to cluster stability and data integrity.
+This proposal aims to provide granular control at the topic and namespace 
levels to add a new fixed delay delivery configuration: 
+
+1. **Prevent Message Duplication:**  By forcing all messages on a topic to 
have the exact same delay,it prevents the delayed message tracker from becoming 
overwhelmed and eliminates the risk of message duplication upon broker restart, 
leading to a more stable and predictable system.
+2. **Enable High-Scale Scheduling:** When all messages in a topic have the 
same delay, the broker has significantly less state to manage. This allows 
topics to be used as highly efficient, high-scale schedulers. This pattern 
works exceptionally well with the `InMemoryDelayedDeliveryTracker` and its 
`delayedDeliveryFixedDelayDetectionLookahead` feature (introduced in #16609, 
#17907), as the broker can avoid building a large, complex index of individual 
delayed messages.
+3. **Enforcing Compliance and Business Rules:** Certain workflows may require 
certain control that require the enforcement at the broker level
+4. **Simplifying Producer Configuration:** By setting a fixed delay on the 
topic, the responsibility for managing the delay logic is shifted from the 
client to the broker, reducing the risk of client-side misconfiguration.
+
+# Goals
+## In Scope
+-   Introduce a new `fixed-delivery-delay` policy, configurable at the 
namespace and topic levels, to enforce a mandatory delivery delay.
+-   Enhance the existing `/delayedDelivery` admin API endpoints and 
`pulsar-admin` commands to manage both the existing `maxDeliveryDelayInMillis` 
and the new `fixedDeliveryDelayInMillis` policies within the same policy group.
+-   Ensure the `fixed-delivery-delay` policy takes precedence over any 
client-specified delay and the existing `max-delivery-delay` policy.
+
+# High Level Design
+This proposal will enhance the existing `DelayedDeliveryPolicies` object to 
include a new `fixedDeliveryDelayInMillis` field.
+This new policy will follow Pulsar's standard hierarchical model, allowing it 
to be set at the namespace level and overridden at the topic level.
+The core logic will be implemented with the following precedence:
+
+1.  **If `fixed-delivery-delay` is set:** The broker will ignore any 
`deliverAt` time sent by the producer and will override it by calculating 
`publish_time + fixed_delay`. The `max-delivery-delay` policy is ignored.
+2.  **If `fixed-delivery-delay` is NOT set, but `max-delivery-delay` is:** The 
broker will validate the producer's requested `deliverAt` time against the 
`max-delivery-delay` policy, rejecting the message if it exceeds the limit.
+3.  **If neither policy is set:** The system behaves as it does today, 
honoring the client's requested `deliverAt` time, constrained only by the 
global broker-level setting.
+
+# Detailed Design
+## Design & Implementation Details
+
+1.  **Data Model Changes**:
+    *   Add a new `fixedDeliveryDelayInMillis` field to the 
`DelayedDeliveryPolicies.java` class. This class is already used for the 
existing `maxDeliveryDelayInMillis` policy.
+    *   Update `HierarchyTopicPolicies.java` to resolve the effective 
`fixedDeliveryDelayInMillis` for a topic, respecting the topic-over-namespace 
hierarchy.
+
+2.  **Enforcement Logic**:
+    *   The enforcement logic will be placed in `PersistentTopic.java` within 
the `publishMessage` and `publishTxnMessage` methods, right before the existing 
`isExceedMaximumDeliveryDelay` check.
+    *   This logic will modify the `MessageMetadata` of the incoming message 
*before* it is passed to the managed ledger for persistence.
+
+## Public-facing Changes
+### Public API
+#### Topic-Level Policies
+| Method   | Endpoint                                                          
        | Description                                                           
                                                                   |
+| :------- | 
:------------------------------------------------------------------------ 
|:-----------------------------------------------------------------------------------------------------------------------------------------|
+| `POST`   | 
`/admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery`       | 
Sets or updates the delayed delivery policies for the topic.To disable a 
policy, a field can be set to `0`.                              |
+| `GET`    | 
`/admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery`       | 
Gets the configured delayed delivery policies for the topic which optionally 
include the fixedDeliveryDelayInMillis if configured |
+
+#### Namespace-Level Policies
+| Method   | Endpoint                                                          
| Description                                                      |
+| :------- | :---------------------------------------------------------------- 
|:-----------------------------------------------------------------|
+| `POST`   | `/admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery`       
| Sets or updates the delayed delivery policies for the namespace. To disable a 
policy, a field can be set to `0`.|
+| `GET`    | `/admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery`       
| Gets the configured delayed delivery policies for the namespace which 
optionally include the fixedDeliveryDelayInMillis if configured  |
+
+### Binary protocol
+
+### Configuration
+
+### CLI
+*   The existing `set-delayed-delivery` command in `CmdTopicPolicies.java` and 
`CmdNamespaces.java` will be updated with a new optional parameter: 
`--fixed-delay` (or `-fd`).
+*   The REST endpoints under `/delayedDelivery` will be updated to accept and 
return the new `fixedDeliveryDelayInMillis` field in their JSON payload.
+
+### Metrics
+
+To provide visibility into the enforcement of the new message delay 
policies,we will introduce a new counter metric for maxDeliveryDelayInMillis 
+*   **Full Name**: `pulsar.broker.topic.messages.delayed.rejected`
+*   **Description**: A counter for the total number of messages rejected 
because their delivery delay exceeded the configured maximum 
(`max-delivery-delay`). This helps administrators monitor when the policy is 
being enforced and identify misconfigured producers.
+*   **Attributes (Labels)**:
+    *   `pulsar.cluster`: The cluster where the broker is running.
+    *   `pulsar.namespace`: The namespace of the topic.
+    *   `pulsar.topic`: The specific topic where the message was rejected.
+*   **Unit**: `{message}` (A standard unit for a count of messages).
+
+To provide visibility into the enforcement of the fix message delay policies, 
a new counter metric will be introduced for fixedDeliveryDelayInMillis
+* **Full Name**: `pulsar.broker.topic.messages.fixed.delay.overridden`
+    *   **Description**: A counter for the total number of messages where a 
client-specified `deliverAt` time was overridden by the topic's 
`fixed-delivery-delay` policy. This provides direct observability into how 
often the policy is being enforced against client-side settings.
+    *   **Attributes (Labels)**:
+        *   `pulsar.cluster`: The cluster where the broker is running.
+        *   `pulsar.namespace`: The namespace of the topic.
+        *   `pulsar.topic`: The specific topic where the message was 
overridden.
+    *   **Unit**: `{message}`.
+
+# Monitoring
+Administrators can use the new `pulsar.broker.topic.messages.delayed.rejected` 
metric to monitor the health and usage of the delayed delivery feature.
+A sudden spike in this metric could indicate:
+1.  A producer application has been deployed with a misconfiguration, 
attempting to schedule messages with an overly long delay.
+2.  A recent change to the `max-delivery-delay` policy at the namespace or 
topic level is now affecting existing producers.
+
+A high or unexpected count for 
`pulsar.broker.topic.messages.fixed.delay.overridden` indicates that producer 
applications are sending messages with a `deliverAt` time to a topic that has a 
`fixed-delivery-delay` policy. 
+While the policy is being enforced correctly, this metric helps operators 
identify clients that may be misconfigured or unaware of the topic's enforced 
behavior.
+
+# Security Considerations
+
+# Backward & Forward Compatibility
+
+## Upgrade
+
+<!--
+Specify the list of instructions, if there are such, needed to perform 
before/after upgrading to Pulsar version containing this feature.
+-->
+
+## Downgrade / Rollback
+Downgrading to a version without this feature is supported, but the new 
`fixed-delivery-delay` policy will no longer be enforced.
+-   **Behavior Change**:  It will revert to the previous behavior of either 
honoring the client's `deliverAt` time or enforcing the `max-delivery-delay` 
policy.
+-   **Clean Rollback**: Disable the `fixed-delivery-delay` policy on all 
relevant topics and namespaces *before* starting the rollback process. This can 
be done by setting the fixed delay value to `0` using the `pulsar-admin ... 
set-delayed-delivery` command.
+
+## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations
+The system shall guarantee that changes to delayed delivery policies are 
applied atomically across all active brokers.
+This ensures that the cluster does not operate in a mixed-policy state during 
either an upgrade or a downgrade procedure.
+During a geo-replication upgrade, ensure that all clusters are upgraded before 
relying on the consistency of the new
+delayed delivery policies.
+If downgrading a geo-replicated cluster, remove the new topic-level 
configurations *before* downgrading to prevent
+inconsistencies between clusters.
+
+# Alternatives
+
+<!--
+If there are alternatives that were already considered by the authors or, 
after the discussion, by the community, and were rejected, please list them 
here along with the reason why they were rejected.
+-->
+
+# General Notes
+
+# Links
+
+<!--
+Updated afterwards
+-->
+* Mailing List discussion thread: 
https://lists.apache.org/thread/23t4zzbfjrm5r4pbzrofn3d85c0171yn
+* Mailing List voting thread: 
https://lists.apache.org/thread/m8y5brc0o8fgcby1sop3hybjpm46xgrr

Reply via email to