Copilot commented on code in PR #23770:
URL: https://github.com/apache/pulsar/pull/23770#discussion_r2917454695


##########
pip/pip-398.md:
##########
@@ -0,0 +1,278 @@
+# PIP-398: Subscription replication on the broker, namespace and topic levels
+
+# Background knowledge
+
+https://github.com/apache/pulsar/pull/4299 introduces the subscription 
replication feature on the consumer level:
+
+```java
+Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("topic").replicateSubscriptionState(true /* 
false */)
+        .subscriptionName("sub").subscribe();
+```
+
+While this provides flexibility, it introduces overhead in managing 
replication for a large number of consumers. Users
+need to manually enable the `replicateSubscriptionState` flag for each 
consumer, which can become cumbersome in
+large-scale deployments.
+
+# Motivation
+
+The key motivation behind this PIP is to simplify subscription replication 
configuration, especially in failover
+scenarios. When a main cluster goes down and a backup cluster is activated, 
ensuring that subscription states are
+consistently replicated across clusters is critical for failover scenarios. By 
extending the replication configuration
+to the broker, namespace and topic levels, the system reduces the need for 
explicit consumer-level configuration.
+
+# Goals
+
+## In Scope
+
+- Provide management of subscription replication at the broker, namespace and 
topic levels using the Pulsar Admin CLI
+  and API.
+- Define the priority order for subscription replication configuration across 
different levels.
+
+## Out of Scope
+
+- Changes to the existing consumer-level `replicateSubscriptionState` 
configuration.
+- Changes to the subscription replication mechanism itself (how states are 
replicated across clusters).
+
+# High Level Design
+
+This PIP introduces the `replicateSubscriptionState` configuration at the 
**broker, namespace, and topic levels** to
+simplify the management of subscription replication.
+
+With this proposal, administrators can configure subscription replication at 
higher levels so that consumers
+automatically inherit the effective configuration.
+
+## Subscription Replication Evaluation
+
+Subscription replication policies follow Pulsar's hierarchical configuration 
model:
+
+```
+Topic > Namespace > Broker
+```
+
+The **highest level with a non-null value** determines the effective policy; 
lower levels are ignored.
+
+* **Broker, Namespace, and Topic levels** follow **enable-only semantics**:
+    * `true` → enables subscription replication.
+    * `false` → does **not disable** existing replicated subscriptions.
+    * `null` → not configured; continue to the next lower level.
+
+* **Consumer configuration** participates in the evaluation but is **not part 
of the policy hierarchy**:
+    * `true` → enables replication.
+    * `false` or `null` → does **not disable** replication; inherits the 
effective configuration from higher-level
+      policies.
+
+**Evaluation flow for each subscription:**
+
+1. Traverse `Topic → Namespace → Broker` to find the first non-null policy.
+2. If the effective policy is `true`, mark the subscription as replicated.
+3. Otherwise, evaluate the consumer configuration:
+    * `true` → enables replication.
+    * `false` or `null` → no change; existing replicated state is preserved.
+4. Explicitly **disabling a subscription** is only possible via **Pulsar 
Admin**, and succeeds **only if the effective policy is not `true`**.  
+   If a higher-level policy later becomes `true` (topic reload, 
namespace/broker update), the broker re-applies the effective policy and marks 
the subscription as replicated again.
+
+### Design Consistency with Consumer-level Logic
+
+This design builds upon the existing **consumer-level logic**, which already 
follows **enable-only semantics**:
+
+```java
+// 
https://github.com/apache/pulsar/blob/8798a4662300dfa3d330dbb0277073874cc2d86e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1153-L1156
+if (replicated != null && replicated && !subscription.isReplicated()) {
+    subscription.setReplicated(true);
+}
+```
+
+Key points:
+
+* **Consumer-level default changed** from `false` to `null` ([PR 
#23757](https://github.com/apache/pulsar/pull/23757)).
+  * Combined with the multi-level policy evaluation, this ensures 
**compatibility across all client languages and versions**:
+      * Older clients defaulting to `false`
+      * Newer clients defaulting to `null`
+  * Both behave correctly with **broker, namespace, and topic-level policies**.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Broker level
+
+Add the field `replicateSubscriptionState` to the 
`org.apache.pulsar.broker.ServiceConfiguration` class
+to control subscription replication at the broker level:
+
+```java
+public class ServiceConfiguration implements PulsarConfiguration {
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            required = false,
+            dynamic = false,
+            doc = "The default value for replicating subscription state."
+    )
+    private Boolean replicateSubscriptionState;
+}
+```
+
+### Namespace level
+
+1. Add the field `replicate_subscription_state` to the 
`org.apache.pulsar.common.policies.data.Policies` class
+   to control subscription replication at the namespace level:
+    ```java
+    public class Policies {
+        @SuppressWarnings("checkstyle:MemberName")
+        public Boolean replicate_subscription_state;
+    }
+    ```
+2. Add the management methods to the 
`org.apache.pulsar.client.admin.Namespaces` interface:
+     ```java
+     public interface Namespaces {
+         void setReplicateSubscriptionState(String namespace, boolean enabled) 
throws PulsarAdminException;
+         CompletableFuture<Void> setReplicateSubscriptionStateAsync(String 
namespace, boolean enabled);
+         Boolean getReplicateSubscriptionState(String namespace) throws 
PulsarAdminException;
+         CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String 
namespace);
+         void removeReplicateSubscriptionState(String namespace) throws 
PulsarAdminException;
+         CompletableFuture<Void> removeReplicateSubscriptionStateAsync(String 
namespace);
+     }
+     ```
+3. Implement the management methods in the 
`org.apache.pulsar.client.admin.internal.NamespacesImpl` class.
+
+### Topic level
+
+1. Add the field `Boolean replicateSubscriptionState` to the 
`org.apache.pulsar.common.policies.data.TopicPolicies`
+   class to enable subscription replication at the topic level:
+   ```java
+    public class TopicPolicies {
+         public Boolean replicateSubscriptionState;
+    }
+    ```
+2. Add the management methods to the 
`org.apache.pulsar.client.admin.TopicPolicies` interface:
+   ```java
+       public interface TopicPolicies {
+           void setReplicateSubscriptionState(String topic, boolean enabled) 
throws PulsarAdminException;
+           CompletableFuture<Void> setReplicateSubscriptionStateAsync(String 
topic, boolean enabled);
+           Boolean getReplicateSubscriptionState(String topic, boolean 
applied) throws PulsarAdminException;
+           CompletableFuture<Boolean> 
getReplicateSubscriptionStateAsync(String topic, boolean applied);
+           void removeReplicateSubscriptionState(String topic) throws 
PulsarAdminException;
+           CompletableFuture<Void> 
removeReplicateSubscriptionStateAsync(String topic);
+       }
+   ```
+3. Implement the management methods in the 
`org.apache.pulsar.client.admin.internal.TopicPoliciesImpl` class.
+
+### Consumer level
+
+No changes. When the consumer with `replicateSubscriptionState=true`, the 
subscription will be snapshot. If `false`, no
+operation will be performed.
+
+## Public-facing Changes
+
+### Public API
+
+> **Note:** Setting `false` at namespace or topic level is accepted but not 
applied (enable-only semantics). Use the
+`DELETE` endpoint or `remove-` command to clear the configuration instead.
+
+##### Namespace level
+
+- `POST /{tenant}/{namespace}/replicateSubscriptionState`: set the 
subscription replication configuration on the
+  namespace level.
+    - Content-Type: `application/json`
+    - Body: `true` or `false`.
+- `GET /{tenant}/{namespace}/replicateSubscriptionState`: get subscription 
replication configuration on the namespace
+  level.
+    - Response: `true`, `false`, or `null` (not configured)
+- `DELETE /{tenant}/{namespace}/replicateSubscriptionState`: remove the 
subscription replication configuration on the
+  namespace level.
+
+##### Topic level
+
+- `POST /{tenant}/{namespace}/{topic}/replicateSubscriptionState`: set the 
subscription replication configuration on
+  the topic level.
+    - Content-Type: `application/json`
+    - Body: `true` or `false`.
+- `GET /{tenant}/{namespace}/{topic}/replicateSubscriptionState`: get 
subscription replication configuration on the
+  topic level.
+    - Parameters:
+        - `applied=true`: get the effective configuration following `Topic > 
Namespace > Broker` precedence.
+        - `applied=false`: get the topic level configuration only.
+    - Response: `true`, `false`, or `null` (not configured)
+- `DELETE /{tenant}/{namespace}/{topic}/replicateSubscriptionState`: remove 
the subscription replication configuration
+  on the topic level.
+
+### CLI
+
+#### Namespace level
+
+- `pulsar-admin namespaces set-replicate-subscription-state 
<tenant>/<namespace> --enabled true/false` to
+  set the subscription replication on the namespace level.
+- `pulsar-admin namespaces get-replicate-subscription-state 
<tenant>/<namespace>` to get the subscription
+  replication configuration on the namespace level.
+- `pulsar-admin namespaces remove-replicate-subscription-state 
<tenant>/<namespace>` to remove the subscription
+  replication configuration on the namespace level.

Review Comment:
   The proposed namespace CLI uses `--enabled true/false`, but existing 
`pulsar-admin namespaces` “enable/disable” commands typically use mutually 
exclusive `--enable` / `--disable` flags (see 
`CmdNamespaces.SetDeduplication`). Consider aligning the new command options 
with the established CLI pattern for consistency.



##########
pip/pip-398.md:
##########
@@ -0,0 +1,278 @@
+# PIP-398: Subscription replication on the broker, namespace and topic levels
+
+# Background knowledge
+
+https://github.com/apache/pulsar/pull/4299 introduces the subscription 
replication feature on the consumer level:
+
+```java
+Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("topic").replicateSubscriptionState(true /* 
false */)
+        .subscriptionName("sub").subscribe();
+```
+
+While this provides flexibility, it introduces overhead in managing 
replication for a large number of consumers. Users
+need to manually enable the `replicateSubscriptionState` flag for each 
consumer, which can become cumbersome in
+large-scale deployments.
+
+# Motivation
+
+The key motivation behind this PIP is to simplify subscription replication 
configuration, especially in failover
+scenarios. When a main cluster goes down and a backup cluster is activated, 
ensuring that subscription states are
+consistently replicated across clusters is critical for failover scenarios. By 
extending the replication configuration
+to the broker, namespace and topic levels, the system reduces the need for 
explicit consumer-level configuration.
+
+# Goals
+
+## In Scope
+
+- Provide management of subscription replication at the broker, namespace and 
topic levels using the Pulsar Admin CLI
+  and API.
+- Define the priority order for subscription replication configuration across 
different levels.
+
+## Out of Scope
+
+- Changes to the existing consumer-level `replicateSubscriptionState` 
configuration.
+- Changes to the subscription replication mechanism itself (how states are 
replicated across clusters).
+
+# High Level Design
+
+This PIP introduces the `replicateSubscriptionState` configuration at the 
**broker, namespace, and topic levels** to
+simplify the management of subscription replication.
+
+With this proposal, administrators can configure subscription replication at 
higher levels so that consumers
+automatically inherit the effective configuration.
+
+## Subscription Replication Evaluation
+
+Subscription replication policies follow Pulsar's hierarchical configuration 
model:
+
+```
+Topic > Namespace > Broker
+```
+
+The **highest level with a non-null value** determines the effective policy; 
lower levels are ignored.
+
+* **Broker, Namespace, and Topic levels** follow **enable-only semantics**:
+    * `true` → enables subscription replication.
+    * `false` → does **not disable** existing replicated subscriptions.
+    * `null` → not configured; continue to the next lower level.
+
+* **Consumer configuration** participates in the evaluation but is **not part 
of the policy hierarchy**:
+    * `true` → enables replication.
+    * `false` or `null` → does **not disable** replication; inherits the 
effective configuration from higher-level
+      policies.
+
+**Evaluation flow for each subscription:**
+
+1. Traverse `Topic → Namespace → Broker` to find the first non-null policy.
+2. If the effective policy is `true`, mark the subscription as replicated.
+3. Otherwise, evaluate the consumer configuration:
+    * `true` → enables replication.
+    * `false` or `null` → no change; existing replicated state is preserved.
+4. Explicitly **disabling a subscription** is only possible via **Pulsar 
Admin**, and succeeds **only if the effective policy is not `true`**.  
+   If a higher-level policy later becomes `true` (topic reload, 
namespace/broker update), the broker re-applies the effective policy and marks 
the subscription as replicated again.
+
+### Design Consistency with Consumer-level Logic
+
+This design builds upon the existing **consumer-level logic**, which already 
follows **enable-only semantics**:
+
+```java
+// 
https://github.com/apache/pulsar/blob/8798a4662300dfa3d330dbb0277073874cc2d86e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1153-L1156
+if (replicated != null && replicated && !subscription.isReplicated()) {
+    subscription.setReplicated(true);
+}
+```
+
+Key points:
+
+* **Consumer-level default changed** from `false` to `null` ([PR 
#23757](https://github.com/apache/pulsar/pull/23757)).
+  * Combined with the multi-level policy evaluation, this ensures 
**compatibility across all client languages and versions**:
+      * Older clients defaulting to `false`
+      * Newer clients defaulting to `null`
+  * Both behave correctly with **broker, namespace, and topic-level policies**.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Broker level
+
+Add the field `replicateSubscriptionState` to the 
`org.apache.pulsar.broker.ServiceConfiguration` class
+to control subscription replication at the broker level:
+
+```java
+public class ServiceConfiguration implements PulsarConfiguration {
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            required = false,
+            dynamic = false,
+            doc = "The default value for replicating subscription state."
+    )
+    private Boolean replicateSubscriptionState;
+}
+```
+
+### Namespace level
+
+1. Add the field `replicate_subscription_state` to the 
`org.apache.pulsar.common.policies.data.Policies` class
+   to control subscription replication at the namespace level:
+    ```java
+    public class Policies {
+        @SuppressWarnings("checkstyle:MemberName")
+        public Boolean replicate_subscription_state;
+    }
+    ```
+2. Add the management methods to the 
`org.apache.pulsar.client.admin.Namespaces` interface:
+     ```java
+     public interface Namespaces {
+         void setReplicateSubscriptionState(String namespace, boolean enabled) 
throws PulsarAdminException;
+         CompletableFuture<Void> setReplicateSubscriptionStateAsync(String 
namespace, boolean enabled);
+         Boolean getReplicateSubscriptionState(String namespace) throws 
PulsarAdminException;
+         CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String 
namespace);
+         void removeReplicateSubscriptionState(String namespace) throws 
PulsarAdminException;
+         CompletableFuture<Void> removeReplicateSubscriptionStateAsync(String 
namespace);
+     }
+     ```
+3. Implement the management methods in the 
`org.apache.pulsar.client.admin.internal.NamespacesImpl` class.
+
+### Topic level
+
+1. Add the field `Boolean replicateSubscriptionState` to the 
`org.apache.pulsar.common.policies.data.TopicPolicies`
+   class to enable subscription replication at the topic level:
+   ```java
+    public class TopicPolicies {
+         public Boolean replicateSubscriptionState;
+    }
+    ```
+2. Add the management methods to the 
`org.apache.pulsar.client.admin.TopicPolicies` interface:
+   ```java
+       public interface TopicPolicies {
+           void setReplicateSubscriptionState(String topic, boolean enabled) 
throws PulsarAdminException;
+           CompletableFuture<Void> setReplicateSubscriptionStateAsync(String 
topic, boolean enabled);
+           Boolean getReplicateSubscriptionState(String topic, boolean 
applied) throws PulsarAdminException;
+           CompletableFuture<Boolean> 
getReplicateSubscriptionStateAsync(String topic, boolean applied);
+           void removeReplicateSubscriptionState(String topic) throws 
PulsarAdminException;
+           CompletableFuture<Void> 
removeReplicateSubscriptionStateAsync(String topic);
+       }
+   ```
+3. Implement the management methods in the 
`org.apache.pulsar.client.admin.internal.TopicPoliciesImpl` class.
+
+### Consumer level
+
+No changes. When the consumer with `replicateSubscriptionState=true`, the 
subscription will be snapshot. If `false`, no

Review Comment:
   This sentence is grammatically incorrect and a bit unclear: “the 
subscription will be snapshot”. Consider rephrasing to “a snapshot will be 
taken” / “the subscription state will be snapshotted” to match the actual 
behavior being described.
   ```suggestion
   No changes. When the consumer has `replicateSubscriptionState=true`, the 
subscription state will be snapshotted. If `false`, no
   ```



##########
pip/pip-398.md:
##########
@@ -0,0 +1,278 @@
+# PIP-398: Subscription replication on the broker, namespace and topic levels
+
+# Background knowledge
+
+https://github.com/apache/pulsar/pull/4299 introduces the subscription 
replication feature on the consumer level:
+
+```java
+Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("topic").replicateSubscriptionState(true /* 
false */)
+        .subscriptionName("sub").subscribe();
+```
+
+While this provides flexibility, it introduces overhead in managing 
replication for a large number of consumers. Users
+need to manually enable the `replicateSubscriptionState` flag for each 
consumer, which can become cumbersome in
+large-scale deployments.
+
+# Motivation
+
+The key motivation behind this PIP is to simplify subscription replication 
configuration, especially in failover
+scenarios. When a main cluster goes down and a backup cluster is activated, 
ensuring that subscription states are
+consistently replicated across clusters is critical for failover scenarios. By 
extending the replication configuration
+to the broker, namespace and topic levels, the system reduces the need for 
explicit consumer-level configuration.
+
+# Goals
+
+## In Scope
+
+- Provide management of subscription replication at the broker, namespace and 
topic levels using the Pulsar Admin CLI
+  and API.
+- Define the priority order for subscription replication configuration across 
different levels.
+
+## Out of Scope
+
+- Changes to the existing consumer-level `replicateSubscriptionState` 
configuration.
+- Changes to the subscription replication mechanism itself (how states are 
replicated across clusters).
+
+# High Level Design
+
+This PIP introduces the `replicateSubscriptionState` configuration at the 
**broker, namespace, and topic levels** to
+simplify the management of subscription replication.
+
+With this proposal, administrators can configure subscription replication at 
higher levels so that consumers
+automatically inherit the effective configuration.
+
+## Subscription Replication Evaluation
+
+Subscription replication policies follow Pulsar's hierarchical configuration 
model:
+
+```
+Topic > Namespace > Broker
+```
+
+The **highest level with a non-null value** determines the effective policy; 
lower levels are ignored.
+
+* **Broker, Namespace, and Topic levels** follow **enable-only semantics**:
+    * `true` → enables subscription replication.
+    * `false` → does **not disable** existing replicated subscriptions.
+    * `null` → not configured; continue to the next lower level.
+
+* **Consumer configuration** participates in the evaluation but is **not part 
of the policy hierarchy**:
+    * `true` → enables replication.
+    * `false` or `null` → does **not disable** replication; inherits the 
effective configuration from higher-level
+      policies.
+
+**Evaluation flow for each subscription:**
+
+1. Traverse `Topic → Namespace → Broker` to find the first non-null policy.
+2. If the effective policy is `true`, mark the subscription as replicated.
+3. Otherwise, evaluate the consumer configuration:
+    * `true` → enables replication.
+    * `false` or `null` → no change; existing replicated state is preserved.
+4. Explicitly **disabling a subscription** is only possible via **Pulsar 
Admin**, and succeeds **only if the effective policy is not `true`**.  
+   If a higher-level policy later becomes `true` (topic reload, 
namespace/broker update), the broker re-applies the effective policy and marks 
the subscription as replicated again.
+
+### Design Consistency with Consumer-level Logic
+
+This design builds upon the existing **consumer-level logic**, which already 
follows **enable-only semantics**:
+
+```java
+// 
https://github.com/apache/pulsar/blob/8798a4662300dfa3d330dbb0277073874cc2d86e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1153-L1156
+if (replicated != null && replicated && !subscription.isReplicated()) {
+    subscription.setReplicated(true);
+}
+```
+
+Key points:
+
+* **Consumer-level default changed** from `false` to `null` ([PR 
#23757](https://github.com/apache/pulsar/pull/23757)).
+  * Combined with the multi-level policy evaluation, this ensures 
**compatibility across all client languages and versions**:
+      * Older clients defaulting to `false`
+      * Newer clients defaulting to `null`
+  * Both behave correctly with **broker, namespace, and topic-level policies**.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Broker level
+
+Add the field `replicateSubscriptionState` to the 
`org.apache.pulsar.broker.ServiceConfiguration` class
+to control subscription replication at the broker level:
+
+```java
+public class ServiceConfiguration implements PulsarConfiguration {
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            required = false,
+            dynamic = false,
+            doc = "The default value for replicating subscription state."
+    )
+    private Boolean replicateSubscriptionState;
+}
+```
+
+### Namespace level
+
+1. Add the field `replicate_subscription_state` to the 
`org.apache.pulsar.common.policies.data.Policies` class
+   to control subscription replication at the namespace level:
+    ```java
+    public class Policies {
+        @SuppressWarnings("checkstyle:MemberName")
+        public Boolean replicate_subscription_state;
+    }
+    ```
+2. Add the management methods to the 
`org.apache.pulsar.client.admin.Namespaces` interface:
+     ```java
+     public interface Namespaces {
+         void setReplicateSubscriptionState(String namespace, boolean enabled) 
throws PulsarAdminException;
+         CompletableFuture<Void> setReplicateSubscriptionStateAsync(String 
namespace, boolean enabled);
+         Boolean getReplicateSubscriptionState(String namespace) throws 
PulsarAdminException;
+         CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String 
namespace);
+         void removeReplicateSubscriptionState(String namespace) throws 
PulsarAdminException;
+         CompletableFuture<Void> removeReplicateSubscriptionStateAsync(String 
namespace);
+     }
+     ```
+3. Implement the management methods in the 
`org.apache.pulsar.client.admin.internal.NamespacesImpl` class.
+
+### Topic level
+
+1. Add the field `Boolean replicateSubscriptionState` to the 
`org.apache.pulsar.common.policies.data.TopicPolicies`
+   class to enable subscription replication at the topic level:
+   ```java
+    public class TopicPolicies {
+         public Boolean replicateSubscriptionState;
+    }
+    ```
+2. Add the management methods to the 
`org.apache.pulsar.client.admin.TopicPolicies` interface:
+   ```java
+       public interface TopicPolicies {
+           void setReplicateSubscriptionState(String topic, boolean enabled) 
throws PulsarAdminException;
+           CompletableFuture<Void> setReplicateSubscriptionStateAsync(String 
topic, boolean enabled);
+           Boolean getReplicateSubscriptionState(String topic, boolean 
applied) throws PulsarAdminException;
+           CompletableFuture<Boolean> 
getReplicateSubscriptionStateAsync(String topic, boolean applied);
+           void removeReplicateSubscriptionState(String topic) throws 
PulsarAdminException;
+           CompletableFuture<Void> 
removeReplicateSubscriptionStateAsync(String topic);
+       }
+   ```
+3. Implement the management methods in the 
`org.apache.pulsar.client.admin.internal.TopicPoliciesImpl` class.
+
+### Consumer level
+
+No changes. When the consumer with `replicateSubscriptionState=true`, the 
subscription will be snapshot. If `false`, no
+operation will be performed.
+
+## Public-facing Changes
+
+### Public API
+
+> **Note:** Setting `false` at namespace or topic level is accepted but not 
applied (enable-only semantics). Use the
+`DELETE` endpoint or `remove-` command to clear the configuration instead.
+
+##### Namespace level
+
+- `POST /{tenant}/{namespace}/replicateSubscriptionState`: set the 
subscription replication configuration on the
+  namespace level.
+    - Content-Type: `application/json`
+    - Body: `true` or `false`.
+- `GET /{tenant}/{namespace}/replicateSubscriptionState`: get subscription 
replication configuration on the namespace
+  level.
+    - Response: `true`, `false`, or `null` (not configured)
+- `DELETE /{tenant}/{namespace}/replicateSubscriptionState`: remove the 
subscription replication configuration on the
+  namespace level.
+
+##### Topic level
+
+- `POST /{tenant}/{namespace}/{topic}/replicateSubscriptionState`: set the 
subscription replication configuration on
+  the topic level.
+    - Content-Type: `application/json`
+    - Body: `true` or `false`.
+- `GET /{tenant}/{namespace}/{topic}/replicateSubscriptionState`: get 
subscription replication configuration on the
+  topic level.
+    - Parameters:
+        - `applied=true`: get the effective configuration following `Topic > 
Namespace > Broker` precedence.
+        - `applied=false`: get the topic level configuration only.
+    - Response: `true`, `false`, or `null` (not configured)
+- `DELETE /{tenant}/{namespace}/{topic}/replicateSubscriptionState`: remove 
the subscription replication configuration
+  on the topic level.
+
+### CLI
+
+#### Namespace level
+
+- `pulsar-admin namespaces set-replicate-subscription-state 
<tenant>/<namespace> --enabled true/false` to
+  set the subscription replication on the namespace level.
+- `pulsar-admin namespaces get-replicate-subscription-state 
<tenant>/<namespace>` to get the subscription
+  replication configuration on the namespace level.
+- `pulsar-admin namespaces remove-replicate-subscription-state 
<tenant>/<namespace>` to remove the subscription
+  replication configuration on the namespace level.
+
+#### Topic level
+
+- `pulsar-admin topicPolicies set-replicate-subscription-state 
<tenant>/<namespace>/<topic> --enabled true/false`
+  to set the subscription replication on the topic level.
+- `pulsar-admin topicPolicies get-replicate-subscription-state 
<tenant>/<namespace>/<topic>` to get the
+  subscription replication configuration on the topic level.
+- `pulsar-admin topicPolicies remove-replicate-subscription-state 
<tenant>/<namespace>/<topic>` to remove the
+  subscription replication configuration on the topic level.
+
+# Security Considerations
+
+Both write and read operations require the necessary permissions, which 
already exist in Pulsar.
+
+## Namespace level
+
+- Write the subscription replication configuration:
+    - Required: `PolicyName.REPLICATED_SUBSCRIPTION` with `WRITE` permission.
+- Read the subscription replication configuration:
+    - Required: `PolicyName.REPLICATED_SUBSCRIPTION` with `READ` permission.

Review Comment:
   `PolicyName.REPLICATED_SUBSCRIPTION` does not exist in the current codebase 
(`PolicyName` enum has no such constant). Either the PIP should reference the 
correct existing policy name/authorization check for namespace-level 
operations, or explicitly state that a new `PolicyName` value (and 
corresponding enforcement) will be introduced.
   ```suggestion
   Both write and read operations require appropriate Pulsar authorization 
checks.
   
   ## Namespace level
   
   - Write the subscription replication configuration:
       - Required: sufficient namespace-level permission to modify namespace 
policies (for example, the same permission used for other namespace policy 
updates) with `WRITE` access.
   - Read the subscription replication configuration:
       - Required: sufficient namespace-level permission to view namespace 
policies with `READ` access.
   ```



##########
pip/pip-398.md:
##########
@@ -0,0 +1,278 @@
+# PIP-398: Subscription replication on the broker, namespace and topic levels
+
+# Background knowledge
+
+https://github.com/apache/pulsar/pull/4299 introduces the subscription 
replication feature on the consumer level:
+
+```java
+Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("topic").replicateSubscriptionState(true /* 
false */)
+        .subscriptionName("sub").subscribe();
+```
+
+While this provides flexibility, it introduces overhead in managing 
replication for a large number of consumers. Users
+need to manually enable the `replicateSubscriptionState` flag for each 
consumer, which can become cumbersome in
+large-scale deployments.
+
+# Motivation
+
+The key motivation behind this PIP is to simplify subscription replication 
configuration, especially in failover
+scenarios. When a main cluster goes down and a backup cluster is activated, 
ensuring that subscription states are
+consistently replicated across clusters is critical for failover scenarios. By 
extending the replication configuration
+to the broker, namespace and topic levels, the system reduces the need for 
explicit consumer-level configuration.
+
+# Goals
+
+## In Scope
+
+- Provide management of subscription replication at the broker, namespace and 
topic levels using the Pulsar Admin CLI
+  and API.
+- Define the priority order for subscription replication configuration across 
different levels.
+
+## Out of Scope
+
+- Changes to the existing consumer-level `replicateSubscriptionState` 
configuration.
+- Changes to the subscription replication mechanism itself (how states are 
replicated across clusters).
+
+# High Level Design
+
+This PIP introduces the `replicateSubscriptionState` configuration at the 
**broker, namespace, and topic levels** to
+simplify the management of subscription replication.
+
+With this proposal, administrators can configure subscription replication at 
higher levels so that consumers
+automatically inherit the effective configuration.
+
+## Subscription Replication Evaluation
+
+Subscription replication policies follow Pulsar's hierarchical configuration 
model:
+
+```
+Topic > Namespace > Broker
+```
+
+The **highest level with a non-null value** determines the effective policy; 
lower levels are ignored.
+
+* **Broker, Namespace, and Topic levels** follow **enable-only semantics**:
+    * `true` → enables subscription replication.
+    * `false` → does **not disable** existing replicated subscriptions.
+    * `null` → not configured; continue to the next lower level.
+
+* **Consumer configuration** participates in the evaluation but is **not part 
of the policy hierarchy**:
+    * `true` → enables replication.
+    * `false` or `null` → does **not disable** replication; inherits the 
effective configuration from higher-level
+      policies.
+
+**Evaluation flow for each subscription:**
+
+1. Traverse `Topic → Namespace → Broker` to find the first non-null policy.
+2. If the effective policy is `true`, mark the subscription as replicated.
+3. Otherwise, evaluate the consumer configuration:
+    * `true` → enables replication.
+    * `false` or `null` → no change; existing replicated state is preserved.
+4. Explicitly **disabling a subscription** is only possible via **Pulsar 
Admin**, and succeeds **only if the effective policy is not `true`**.  
+   If a higher-level policy later becomes `true` (topic reload, 
namespace/broker update), the broker re-applies the effective policy and marks 
the subscription as replicated again.
+
+### Design Consistency with Consumer-level Logic
+
+This design builds upon the existing **consumer-level logic**, which already 
follows **enable-only semantics**:
+
+```java
+// 
https://github.com/apache/pulsar/blob/8798a4662300dfa3d330dbb0277073874cc2d86e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1153-L1156
+if (replicated != null && replicated && !subscription.isReplicated()) {
+    subscription.setReplicated(true);
+}
+```
+
+Key points:
+
+* **Consumer-level default changed** from `false` to `null` ([PR 
#23757](https://github.com/apache/pulsar/pull/23757)).
+  * Combined with the multi-level policy evaluation, this ensures 
**compatibility across all client languages and versions**:
+      * Older clients defaulting to `false`
+      * Newer clients defaulting to `null`
+  * Both behave correctly with **broker, namespace, and topic-level policies**.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Broker level
+
+Add the field `replicateSubscriptionState` to the 
`org.apache.pulsar.broker.ServiceConfiguration` class
+to control subscription replication at the broker level:
+
+```java
+public class ServiceConfiguration implements PulsarConfiguration {
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            required = false,
+            dynamic = false,
+            doc = "The default value for replicating subscription state."
+    )
+    private Boolean replicateSubscriptionState;
+}
+```
+
+### Namespace level
+
+1. Add the field `replicate_subscription_state` to the 
`org.apache.pulsar.common.policies.data.Policies` class
+   to control subscription replication at the namespace level:
+    ```java
+    public class Policies {
+        @SuppressWarnings("checkstyle:MemberName")
+        public Boolean replicate_subscription_state;
+    }
+    ```
+2. Add the management methods to the 
`org.apache.pulsar.client.admin.Namespaces` interface:
+     ```java
+     public interface Namespaces {
+         void setReplicateSubscriptionState(String namespace, boolean enabled) 
throws PulsarAdminException;
+         CompletableFuture<Void> setReplicateSubscriptionStateAsync(String 
namespace, boolean enabled);
+         Boolean getReplicateSubscriptionState(String namespace) throws 
PulsarAdminException;
+         CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String 
namespace);
+         void removeReplicateSubscriptionState(String namespace) throws 
PulsarAdminException;
+         CompletableFuture<Void> removeReplicateSubscriptionStateAsync(String 
namespace);
+     }
+     ```
+3. Implement the management methods in the 
`org.apache.pulsar.client.admin.internal.NamespacesImpl` class.
+
+### Topic level
+
+1. Add the field `Boolean replicateSubscriptionState` to the 
`org.apache.pulsar.common.policies.data.TopicPolicies`
+   class to enable subscription replication at the topic level:
+   ```java
+    public class TopicPolicies {
+         public Boolean replicateSubscriptionState;
+    }
+    ```
+2. Add the management methods to the 
`org.apache.pulsar.client.admin.TopicPolicies` interface:
+   ```java
+       public interface TopicPolicies {
+           void setReplicateSubscriptionState(String topic, boolean enabled) 
throws PulsarAdminException;
+           CompletableFuture<Void> setReplicateSubscriptionStateAsync(String 
topic, boolean enabled);
+           Boolean getReplicateSubscriptionState(String topic, boolean 
applied) throws PulsarAdminException;
+           CompletableFuture<Boolean> 
getReplicateSubscriptionStateAsync(String topic, boolean applied);
+           void removeReplicateSubscriptionState(String topic) throws 
PulsarAdminException;
+           CompletableFuture<Void> 
removeReplicateSubscriptionStateAsync(String topic);
+       }
+   ```
+3. Implement the management methods in the 
`org.apache.pulsar.client.admin.internal.TopicPoliciesImpl` class.
+
+### Consumer level
+
+No changes. When the consumer with `replicateSubscriptionState=true`, the 
subscription will be snapshot. If `false`, no
+operation will be performed.
+
+## Public-facing Changes
+
+### Public API
+
+> **Note:** Setting `false` at namespace or topic level is accepted but not 
applied (enable-only semantics). Use the
+`DELETE` endpoint or `remove-` command to clear the configuration instead.
+
+##### Namespace level
+
+- `POST /{tenant}/{namespace}/replicateSubscriptionState`: set the 
subscription replication configuration on the
+  namespace level.
+    - Content-Type: `application/json`
+    - Body: `true` or `false`.
+- `GET /{tenant}/{namespace}/replicateSubscriptionState`: get subscription 
replication configuration on the namespace
+  level.
+    - Response: `true`, `false`, or `null` (not configured)
+- `DELETE /{tenant}/{namespace}/replicateSubscriptionState`: remove the 
subscription replication configuration on the
+  namespace level.
+
+##### Topic level
+
+- `POST /{tenant}/{namespace}/{topic}/replicateSubscriptionState`: set the 
subscription replication configuration on
+  the topic level.
+    - Content-Type: `application/json`
+    - Body: `true` or `false`.
+- `GET /{tenant}/{namespace}/{topic}/replicateSubscriptionState`: get 
subscription replication configuration on the
+  topic level.
+    - Parameters:
+        - `applied=true`: get the effective configuration following `Topic > 
Namespace > Broker` precedence.
+        - `applied=false`: get the topic level configuration only.
+    - Response: `true`, `false`, or `null` (not configured)
+- `DELETE /{tenant}/{namespace}/{topic}/replicateSubscriptionState`: remove 
the subscription replication configuration
+  on the topic level.
+
+### CLI
+
+#### Namespace level
+
+- `pulsar-admin namespaces set-replicate-subscription-state 
<tenant>/<namespace> --enabled true/false` to
+  set the subscription replication on the namespace level.
+- `pulsar-admin namespaces get-replicate-subscription-state 
<tenant>/<namespace>` to get the subscription
+  replication configuration on the namespace level.
+- `pulsar-admin namespaces remove-replicate-subscription-state 
<tenant>/<namespace>` to remove the subscription
+  replication configuration on the namespace level.
+
+#### Topic level
+
+- `pulsar-admin topicPolicies set-replicate-subscription-state 
<tenant>/<namespace>/<topic> --enabled true/false`
+  to set the subscription replication on the topic level.
+- `pulsar-admin topicPolicies get-replicate-subscription-state 
<tenant>/<namespace>/<topic>` to get the
+  subscription replication configuration on the topic level.
+- `pulsar-admin topicPolicies remove-replicate-subscription-state 
<tenant>/<namespace>/<topic>` to remove the
+  subscription replication configuration on the topic level.

Review Comment:
   Similarly for `pulsar-admin topicPolicies ...`: existing topic policy 
setters generally use mutually exclusive `--enable` / `--disable` flags (see 
`CmdTopicPolicies.SetDeduplicationStatus`), not `--enabled true/false`. 
Aligning with the existing CLI option style will make the new commands more 
discoverable and consistent.



##########
pip/pip-398.md:
##########
@@ -0,0 +1,278 @@
+# PIP-398: Subscription replication on the broker, namespace and topic levels
+
+# Background knowledge
+
+https://github.com/apache/pulsar/pull/4299 introduces the subscription 
replication feature on the consumer level:
+
+```java
+Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("topic").replicateSubscriptionState(true /* 
false */)
+        .subscriptionName("sub").subscribe();
+```
+
+While this provides flexibility, it introduces overhead in managing 
replication for a large number of consumers. Users
+need to manually enable the `replicateSubscriptionState` flag for each 
consumer, which can become cumbersome in
+large-scale deployments.
+
+# Motivation
+
+The key motivation behind this PIP is to simplify subscription replication 
configuration, especially in failover
+scenarios. When a main cluster goes down and a backup cluster is activated, 
ensuring that subscription states are
+consistently replicated across clusters is critical for failover scenarios. By 
extending the replication configuration
+to the broker, namespace and topic levels, the system reduces the need for 
explicit consumer-level configuration.
+
+# Goals
+
+## In Scope
+
+- Provide management of subscription replication at the broker, namespace and 
topic levels using the Pulsar Admin CLI
+  and API.
+- Define the priority order for subscription replication configuration across 
different levels.
+
+## Out of Scope
+
+- Changes to the existing consumer-level `replicateSubscriptionState` 
configuration.
+- Changes to the subscription replication mechanism itself (how states are 
replicated across clusters).
+
+# High Level Design
+
+This PIP introduces the `replicateSubscriptionState` configuration at the 
**broker, namespace, and topic levels** to
+simplify the management of subscription replication.
+
+With this proposal, administrators can configure subscription replication at 
higher levels so that consumers
+automatically inherit the effective configuration.
+
+## Subscription Replication Evaluation
+
+Subscription replication policies follow Pulsar's hierarchical configuration 
model:
+
+```
+Topic > Namespace > Broker
+```
+
+The **highest level with a non-null value** determines the effective policy; 
lower levels are ignored.
+
+* **Broker, Namespace, and Topic levels** follow **enable-only semantics**:
+    * `true` → enables subscription replication.
+    * `false` → does **not disable** existing replicated subscriptions.
+    * `null` → not configured; continue to the next lower level.
+
+* **Consumer configuration** participates in the evaluation but is **not part 
of the policy hierarchy**:
+    * `true` → enables replication.
+    * `false` or `null` → does **not disable** replication; inherits the 
effective configuration from higher-level
+      policies.
+
+**Evaluation flow for each subscription:**
+
+1. Traverse `Topic → Namespace → Broker` to find the first non-null policy.
+2. If the effective policy is `true`, mark the subscription as replicated.
+3. Otherwise, evaluate the consumer configuration:
+    * `true` → enables replication.
+    * `false` or `null` → no change; existing replicated state is preserved.
+4. Explicitly **disabling a subscription** is only possible via **Pulsar 
Admin**, and succeeds **only if the effective policy is not `true`**.  
+   If a higher-level policy later becomes `true` (topic reload, 
namespace/broker update), the broker re-applies the effective policy and marks 
the subscription as replicated again.
+
+### Design Consistency with Consumer-level Logic
+
+This design builds upon the existing **consumer-level logic**, which already 
follows **enable-only semantics**:
+
+```java
+// 
https://github.com/apache/pulsar/blob/8798a4662300dfa3d330dbb0277073874cc2d86e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1153-L1156
+if (replicated != null && replicated && !subscription.isReplicated()) {
+    subscription.setReplicated(true);
+}
+```
+
+Key points:
+
+* **Consumer-level default changed** from `false` to `null` ([PR 
#23757](https://github.com/apache/pulsar/pull/23757)).
+  * Combined with the multi-level policy evaluation, this ensures 
**compatibility across all client languages and versions**:
+      * Older clients defaulting to `false`
+      * Newer clients defaulting to `null`
+  * Both behave correctly with **broker, namespace, and topic-level policies**.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Broker level
+
+Add the field `replicateSubscriptionState` to the 
`org.apache.pulsar.broker.ServiceConfiguration` class
+to control subscription replication at the broker level:
+
+```java
+public class ServiceConfiguration implements PulsarConfiguration {
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            required = false,
+            dynamic = false,
+            doc = "The default value for replicating subscription state."
+    )
+    private Boolean replicateSubscriptionState;
+}
+```
+
+### Namespace level
+
+1. Add the field `replicate_subscription_state` to the 
`org.apache.pulsar.common.policies.data.Policies` class
+   to control subscription replication at the namespace level:
+    ```java
+    public class Policies {
+        @SuppressWarnings("checkstyle:MemberName")
+        public Boolean replicate_subscription_state;
+    }
+    ```
+2. Add the management methods to the 
`org.apache.pulsar.client.admin.Namespaces` interface:
+     ```java
+     public interface Namespaces {
+         void setReplicateSubscriptionState(String namespace, boolean enabled) 
throws PulsarAdminException;
+         CompletableFuture<Void> setReplicateSubscriptionStateAsync(String 
namespace, boolean enabled);
+         Boolean getReplicateSubscriptionState(String namespace) throws 
PulsarAdminException;
+         CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String 
namespace);
+         void removeReplicateSubscriptionState(String namespace) throws 
PulsarAdminException;
+         CompletableFuture<Void> removeReplicateSubscriptionStateAsync(String 
namespace);
+     }
+     ```
+3. Implement the management methods in the 
`org.apache.pulsar.client.admin.internal.NamespacesImpl` class.
+
+### Topic level
+
+1. Add the field `Boolean replicateSubscriptionState` to the 
`org.apache.pulsar.common.policies.data.TopicPolicies`
+   class to enable subscription replication at the topic level:
+   ```java
+    public class TopicPolicies {
+         public Boolean replicateSubscriptionState;

Review Comment:
   In the topic-level section, the snippet shows adding `public Boolean 
replicateSubscriptionState` to `TopicPolicies`, but `TopicPolicies` uses Lombok 
with `private` fields (not public members). Consider updating the snippet to 
match the existing class style (e.g., `private Boolean ...`) to avoid 
misleading the implementation.
   ```suggestion
            private Boolean replicateSubscriptionState;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to