This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 96bc370e6b9 [fix][broker]Fix dirty reading of namespace level offload
thresholds (#24696)
96bc370e6b9 is described below
commit 96bc370e6b9a10e1656983d522e272a36a44fdae
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 4 16:45:22 2025 +0800
[fix][broker]Fix dirty reading of namespace level offload thresholds
(#24696)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 68 +++++++++++++++++-----
.../apache/pulsar/broker/admin/AdminApi2Test.java | 51 ++++++++++++++++
2 files changed, 104 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d393a71da72..0e821c83e61 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2182,7 +2182,7 @@ public abstract class NamespacesBase extends
AdminResource {
policies.offload_policies = new OffloadPoliciesImpl();
}
((OffloadPoliciesImpl)
policies.offload_policies).setManagedLedgerOffloadThresholdInBytes(newThreshold);
- policies.offload_threshold = newThreshold;
+ mergeOffloadThresholdsForCompatibility(policies);
return policies;
});
log.info("[{}] Successfully updated offloadThreshold
configuration: namespace={}, value={}",
@@ -2209,7 +2209,7 @@ public abstract class NamespacesBase extends
AdminResource {
}
((OffloadPoliciesImpl) policies.offload_policies)
.setManagedLedgerOffloadThresholdInSeconds(newThreshold);
- policies.offload_threshold_in_seconds =
newThreshold;
+ mergeOffloadThresholdsForCompatibility(policies);
return policies;
})
)
@@ -2239,7 +2239,7 @@ public abstract class NamespacesBase extends
AdminResource {
}
((OffloadPoliciesImpl) policies.offload_policies)
.setManagedLedgerOffloadDeletionLagInMillis(newDeletionLagMs);
- policies.offload_deletion_lag_ms = newDeletionLagMs;
+ mergeOffloadThresholdsForCompatibility(policies);
return policies;
});
log.info("[{}] Successfully updated offloadDeletionLagMs
configuration: namespace={}, value={}",
@@ -2352,6 +2352,50 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
+ /**
+ * Before https://github.com/apache/pulsar/pull/6183, users can set broker
level offload policies, and handle
+ * namespace-level thresholds by the following fields:
+ * - {@link Policies#offload_deletion_lag_ms}
+ * - {@link Policies#offload_threshold}
+ * - {@link Policies#offload_threshold_in_seconds}
+ *
+ * After https://github.com/apache/pulsar/pull/6183, Pulsar supports
namespace-level policies, which was
+ * supported by {@link Policies#offload_policies}. And the thresholds were
moved to the following fields:
+ * - {@link Policies#offload_policies} -> {@link
OffloadPoliciesImpl#getManagedLedgerOffloadDeletionLagInMillis}
+ * - {@link Policies#offload_policies} -> {@link
OffloadPoliciesImpl#getManagedLedgerOffloadThresholdInBytes}
+ * - {@link Policies#offload_policies} -> {@link
OffloadPoliciesImpl#getManagedLedgerOffloadThresholdInSeconds}
+ *
+ * To make the offload policies compatible with the old policies, uses the
old policies if the new policies
+ * are not set. Once the new fields are set, the old fields will be
ignored.
+ */
+ private void mergeOffloadThresholdsForCompatibility(Policies nsPolicies) {
+ Long oldOffloadDeletionLagMs = nsPolicies.offload_deletion_lag_ms;
+ Long oldOffloadThresholdInBytes = nsPolicies.offload_threshold;
+ Long odlOffloadThresholdInSeconds =
nsPolicies.offload_threshold_in_seconds;
+ // If the old values are empty, skip.
+ if (oldOffloadDeletionLagMs == null && oldOffloadThresholdInBytes ==
-1 && odlOffloadThresholdInSeconds == -1) {
+ return;
+ }
+ // If the new values are empty, use the old values.
+ OffloadPoliciesImpl nsOffloadPolicies = (OffloadPoliciesImpl)
nsPolicies.offload_policies;
+ if
(Objects.equals(nsOffloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
+ OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS) &&
oldOffloadDeletionLagMs != null) {
+
nsOffloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(oldOffloadDeletionLagMs);
+ }
+ if
(Objects.equals(nsOffloadPolicies.getManagedLedgerOffloadThresholdInBytes(),
+ OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES) &&
oldOffloadThresholdInBytes != -1) {
+
nsOffloadPolicies.setManagedLedgerOffloadThresholdInBytes(oldOffloadThresholdInBytes);
+ }
+ if
(Objects.equals(nsOffloadPolicies.getManagedLedgerOffloadThresholdInSeconds(),
+ OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS) &&
odlOffloadThresholdInSeconds != -1) {
+
nsOffloadPolicies.setManagedLedgerOffloadThresholdInSeconds(odlOffloadThresholdInSeconds);
+ }
+ // Since the thresholds are moved into "nsPolicies.offload_policies",
remove the old fields.
+ nsPolicies.offload_deletion_lag_ms = null;
+ nsPolicies.offload_threshold = -1;
+ nsPolicies.offload_threshold_in_seconds = -1;
+ }
+
protected void internalSetOffloadPolicies(AsyncResponse asyncResponse,
OffloadPoliciesImpl offloadPolicies) {
validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD,
PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
@@ -2359,19 +2403,8 @@ public abstract class NamespacesBase extends
AdminResource {
try {
namespaceResources().setPoliciesAsync(namespaceName, policies -> {
- if
(Objects.equals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
-
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) {
-
offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms);
- } else {
- policies.offload_deletion_lag_ms =
offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis();
- }
- if
(Objects.equals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(),
-
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES)) {
-
offloadPolicies.setManagedLedgerOffloadThresholdInBytes(policies.offload_threshold);
- } else {
- policies.offload_threshold =
offloadPolicies.getManagedLedgerOffloadThresholdInBytes();
- }
policies.offload_policies = offloadPolicies;
+ mergeOffloadThresholdsForCompatibility(policies);
return policies;
}).thenApply(r -> {
log.info("[{}] Successfully updated offload configuration:
namespace={}, map={}", clientAppId(),
@@ -2397,7 +2430,12 @@ public abstract class NamespacesBase extends
AdminResource {
validatePoliciesReadOnlyAccess();
try {
namespaceResources().setPoliciesAsync(namespaceName, (policies) ->
{
+ // Remove new offload policies.
policies.offload_policies = null;
+ // Remove the old offload policies thresholds.
+ policies.offload_deletion_lag_ms = null;
+ policies.offload_threshold = -1;
+ policies.offload_threshold_in_seconds = -1;
return policies;
}).thenApply(r -> {
log.info("[{}] Successfully remove offload configuration:
namespace={}", clientAppId(), namespaceName);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index bb23c6cf43a..db1f679404a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -125,11 +125,14 @@ import
org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -4041,6 +4044,54 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
Assert.assertTrue(permissions22.isEmpty());
}
+ @Test
+ public void testOverridesNamespaceOffloadThreshold() throws Exception {
+ String namespace = BrokerTestUtil.newUniqueName(this.defaultTenant +
"/ns");
+ String topic = BrokerTestUtil.newUniqueName("persistent://" +
namespace + "/tp");
+ admin.namespaces().createNamespace(namespace);
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topicPolicies().setDispatchRate(topic,
DispatchRate.builder().dispatchThrottlingRateInMsg(1).build());
+ // assert we get -1 which indicates it will fall back to default
+ assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1);
+
assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), -1);
+ // Set namespace level offloading threshold.
+ long m1 = 1024 * 1024;
+ long h1 = 1000 * 3600;
+ OffloadPoliciesImpl policies = OffloadPoliciesImpl.builder()
+ .managedLedgerOffloadDriver("S3")
+ .s3ManagedLedgerOffloadBucket("bucket-1")
+ .managedLedgerOffloadThresholdInBytes(m1)
+ .managedLedgerOffloadThresholdInSeconds(h1)
+ .build();
+ admin.namespaces().setOffloadPolicies(namespace, policies);
+ OffloadPolicies policies1 =
admin.namespaces().getOffloadPolicies(namespace);
+ assertEquals(policies1.getManagedLedgerOffloadThresholdInBytes(), m1);
+ assertEquals(policies1.getManagedLedgerOffloadThresholdInSeconds(),
h1);
+
+ long m2 = 2 * 1024 * 1024L;
+ long h2 = 2 * 1000 * 3600;
+ admin.namespaces().setOffloadThreshold(namespace, m2);
+ admin.namespaces().setOffloadThresholdInSeconds(namespace, h2);
+ OffloadPolicies policies2 =
admin.namespaces().getOffloadPolicies(namespace);
+ assertEquals(policies2.getManagedLedgerOffloadThresholdInBytes(), m2);
+ assertEquals(policies2.getManagedLedgerOffloadThresholdInSeconds(),
h2);
+ OffloadPolicies policies3 =
admin.topicPolicies().getOffloadPolicies(topic, true);
+ assertEquals(policies3.getManagedLedgerOffloadThresholdInBytes(), m2);
+ assertEquals(policies3.getManagedLedgerOffloadThresholdInSeconds(),
h2);
+
+ admin.namespaces().removeOffloadPolicies(namespace);
+ OffloadPolicies policies4 =
admin.namespaces().getOffloadPolicies(namespace);
+ assertTrue(policies4 == null);
+ assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1);
+
assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), -1);
+ OffloadPolicies policies5 =
admin.topicPolicies().getOffloadPolicies(topic, true);
+ assertTrue(policies5 == null);
+
+ // cleanup.
+ admin.topics().delete(topic);
+ admin.namespaces().deleteNamespace(namespace);
+ }
+
@Test
public void testDeletePatchyPartitionedTopic() throws Exception {
final String topic = BrokerTestUtil.newUniqueName(defaultNamespace +
"/tp");