codelipenghui commented on code in PR #25304:
URL: https://github.com/apache/pulsar/pull/25304#discussion_r2920544528


##########
pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java:
##########
@@ -128,17 +115,12 @@ public String getTenant() {
         return tenant;
     }
 
-    @Deprecated
-    public String getCluster() {
-        return cluster;
-    }
-
     public String getLocalName() {
         return localName;
     }

Review Comment:
   **Note:** `isGlobal()` now always returns `true`. This is logically correct 
for V2, but callers like 
`PulsarWebResource.checkLocalOrGetPeerReplicationCluster()` previously skipped 
non-global namespaces. Now ALL namespaces trigger peer-replication metadata 
lookups, which adds overhead for single-cluster deployments.
   
   Also, dead `\!isGlobal()` checks remain in files not touched by this PR:
   - `PersistentTopicsBase.java` (lines ~4967, ~5107) 
   - `NonPersistentTopic.java` (line ~586)
   - `PersistentTopic.java` (line ~1945)
   
   These should be cleaned up to avoid confusion.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java:
##########
@@ -179,11 +177,8 @@ public void updateTenant(@Suspended final AsyncResponse 
asyncResponse,
                     if (!tenantAdmin.isPresent()) {
                         throw new RestException(Status.NOT_FOUND, "Tenant " + 
tenant + " not found");
                     }
-                    TenantInfo oldTenantAdmin = tenantAdmin.get();
-                    Set<String> newClusters = new 
HashSet<>(newTenantAdmin.getAllowedClusters());
-                    return canUpdateCluster(tenant, 
oldTenantAdmin.getAllowedClusters(), newClusters);
+                    return tenantResources().updateTenantAsync(tenant, old -> 
newTenantAdmin);

Review Comment:
   **Safety concern:** The `canUpdateCluster()` validation was removed. 
Previously, updating a tenant checked that removed clusters had no active 
namespaces. Without this check, users can remove clusters from a tenant's 
allowed list while active namespaces still exist, potentially orphaning them.
   
   While V1 cluster hierarchy is gone, the safety check itself (preventing 
orphaned namespaces) is still valuable for V2.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -562,63 +549,51 @@ protected CompletableFuture<Void> 
internalDeleteNamespaceBundleAsync(String bund
                 clientAppId(), namespaceName, bundleRange, authoritative, 
force);
         return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.DELETE_BUNDLE)
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
-                .thenCompose(__ -> {
-                    if (!namespaceName.isGlobal()) {
-                        return 
validateClusterOwnershipAsync(namespaceName.getCluster());
-                    }
-                    return CompletableFuture.completedFuture(null);
-                })
                 .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
                 .thenCompose(policies -> {
                     CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
-                    if (namespaceName.isGlobal()) {
-                        // Just keep the behavior of V1 namespace being the 
same as before.
-                        if (!namespaceName.isV2() && 
policies.replication_clusters.isEmpty()
-                                && policies.allowed_clusters.isEmpty()) {
-                            return CompletableFuture.completedFuture(null);
-                        }
-                        String cluster = 
policies.getClusterThatCanDeleteNamespace();
-                        if (cluster == null) {
-                            // There are still more than one clusters 
configured for the global namespace
-                            throw new 
RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
-                                    + namespaceName
-                                    + ". There are still more than one 
replication clusters configured.");
-                        }
-                        if (!cluster.equals(config().getClusterName())) { // 
No need to change.
-                            // the only replication cluster is other cluster, 
redirect
-                            future = 
clusterResources().getClusterAsync(cluster)
-                                    .thenCompose(clusterData -> {
-                                        if (clusterData.isEmpty()) {
-                                            throw new 
RestException(Status.NOT_FOUND,
-                                                    "Cluster " + cluster + " 
does not exist");
-                                        }
-                                        ClusterData replClusterData = 
clusterData.get();
-                                        URL replClusterUrl;
-                                        try {
-                                            if (!config().isTlsEnabled() || 
!isRequestHttps()) {
-                                                replClusterUrl = new 
URL(replClusterData.getServiceUrl());
-                                            } else if 
(StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
-                                                replClusterUrl = new 
URL(replClusterData.getServiceUrlTls());
-                                            } else {
-                                                throw new 
RestException(Status.PRECONDITION_FAILED,
-                                                        "The replication 
cluster does not provide TLS encrypted "
-                                                                + "service");
-                                            }
-                                        } catch (MalformedURLException 
malformedURLException) {
-                                            throw new 
RestException(malformedURLException);
+                    String cluster = 
policies.getClusterThatCanDeleteNamespace();
+                    if (cluster == null) {

Review Comment:
   **Bug (Critical):** This unconditionally calls 
`getClusterThatCanDeleteNamespace()`, which returns `null` when 
`replication_clusters.size() \!= 1` — including when it's **0**.
   
   Namespaces with zero replication clusters (common in single-cluster 
deployments) will fail to delete with the misleading error: *"more than one 
replication clusters configured"* when there are actually zero.
   
   Previously, the `isGlobal()` + `isV2()` guards allowed non-global/V1 
namespaces to skip this check. Now with `isGlobal()` always true, all 
namespaces hit this path.
   
   The same issue exists in `precheckWhenDeleteNamespace()` above.
   
   Fix:
   ```java
   String cluster = policies.getClusterThatCanDeleteNamespace();
   if (cluster == null) {
       if (policies.replication_clusters.isEmpty() && 
policies.allowed_clusters.isEmpty()) {
           // No replication configured, allow local deletion
       } else {
           throw new RestException(Status.PRECONDITION_FAILED, ...);
       }
   }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -835,17 +805,7 @@ protected CompletableFuture<Void> 
internalSetNamespaceReplicationClusters(List<S
                     if (CollectionUtils.isEmpty(clusterIds)) {
                         throw new RestException(Status.PRECONDITION_FAILED, 
"ClusterIds should not be null or empty");
                     }
-                    if (!namespaceName.isGlobal() && !(clusterIds.size() == 1
-                            && 
clusterIds.get(0).equals(pulsar().getConfiguration().getClusterName()))) {
-                            throw new RestException(Status.PRECONDITION_FAILED,
-                                    "Cannot set replication on a non-global 
namespace");
-                    }
-                    Set<String> replicationClusterSet = 
Sets.newHashSet(clusterIds);
-                    if (replicationClusterSet.contains("global")) {
-                        throw new RestException(Status.PRECONDITION_FAILED,
-                                "Cannot specify global in the list of 
replication clusters");
-                    }
-                    return replicationClusterSet;
+                    return Sets.newHashSet(clusterIds);

Review Comment:
   **Note:** Removed validation that "global" cannot be in replication clusters 
list. If old scripts or tooling pass "global" as a replication cluster, it 
would now be accepted but likely fail downstream when trying to replicate to a 
non-existent cluster.



##########
pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java:
##########
@@ -133,40 +126,20 @@ private TopicName(String completeTopicName) {
                 }
             }
 
-            // The fully qualified topic name can be in two different forms:
-            // new:    persistent://tenant/namespace/topic
-            // legacy: persistent://tenant/cluster/namespace/topic
-
+            // Expected format: persistent://tenant/namespace/topic
             List<String> parts = 
Splitter.on("://").limit(2).splitToList(completeTopicName);
             this.domain = TopicDomain.getEnum(parts.get(0));
 
             String rest = parts.get(1);
 
-            // The rest of the name can be in different forms:
-            // new:    tenant/namespace/<localName>
-            // legacy: tenant/cluster/namespace/<localName>
-            // Examples of localName:
-            // 1. some, name, xyz
-            // 2. xyz-123, feeder-2
-
-
-            parts = Splitter.on("/").limit(4).splitToList(rest);
+            // Expected format: tenant/namespace/<localName>

Review Comment:
   **Bug (Critical):** `limit(3)` silently reinterprets V1 topic names instead 
of rejecting them.
   
   For `persistent://tenant/cluster/namespace/topic`, this splits into 
`["tenant", "cluster", "namespace/topic"]` and is accepted as a valid V2 topic 
with tenant="tenant", namespace="cluster", localName="namespace/topic".
   
   Users with old V1 topic names would **silently produce/consume on the wrong 
namespace**. Unlike `NamespaceName` which rejects 3-part names with a clear 
error, `TopicName` silently accepts.
   
   Suggestion: keep `limit(4)`, detect `parts.size() == 4`, and throw:
   ```java
   } else if (parts.size() == 4) {
       throw new IllegalArgumentException(
           "V1 topic names (with cluster component) are no longer supported. "
           + "Please use the V2 format: '<domain>://tenant/namespace/topic'. 
Got: " + completeTopicName);
   }
   ```



##########
pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java:
##########
@@ -372,12 +320,14 @@ public static String fromPersistenceNamingEncoding(String 
mlName) {
             localName = Codec.decode(parts.get(3));
             return String.format("%s://%s/%s/%s", domain, tenant, 
namespacePortion, localName);
         } else if (parts.size() == 5) {
+            // Legacy V1 managed ledger name: 
tenant/cluster/namespace/domain/topic

Review Comment:
   **Warning (Critical):** For 5-part V1 managed ledger names like 
`tenant/cluster/namespace/domain/topic`, the cluster is silently dropped, 
producing `domain://tenant/namespace/topic`.
   
   If the original V1 namespace was `tenant/cluster/namespace`, this maps the 
topic to a V2 namespace `tenant/namespace` which may or may not exist. There's 
no logging to help operators diagnose issues during migration.
   
   Suggestion: add a warning log here:
   ```java
   } else if (parts.size() == 5) {
       log.warn("Converting V1 managed ledger name to V2 format, dropping 
cluster component: {}", mlName);
       ...
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java:
##########
@@ -173,7 +173,9 @@ protected String parseNamespaceFromLedgerName(String 
ledgerName) {
         Matcher m = V2_LEDGER_NAME_PATTERN.matcher(ledgerName);
 
         if (m.matches()) {
-            return m.group(1);
+            // Ledger name format: tenant/namespace/domain/topic
+            // Extract only tenant/namespace (groups 2 and 3), excluding the 
domain.
+            return m.group(2) + "/" + m.group(3);

Review Comment:
   **Bug + Breaking change:** For V2 ledger names this is a bugfix (old code 
incorrectly included domain in namespace label). But for V1 ledger names still 
in storage (`tenant/cluster/namespace/domain/topic`), this returns 
`tenant/cluster` instead of the correct namespace.
   
   Also, even for V2, this changes the metrics `namespace` dimension from 
`tenant/namespace/domain` to `tenant/namespace`, which will break existing 
dashboards and alerts.
   
   Should be documented as a known breaking change in release notes.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -142,12 +142,10 @@ public class NamespaceService implements AutoCloseable {
 
     public static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
     public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
-    public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = 
Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
-    public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = 
Pattern.compile("pulsar/([^:]+:\\d+)");
-    public static final Pattern SLA_NAMESPACE_PATTERN = 
Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
-    public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s";
-    public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
-    public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + 
"/%s/%s";
+    public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = 
Pattern.compile("pulsar/([^:]+:\\d+)");
+    public static final Pattern SLA_NAMESPACE_PATTERN = 
Pattern.compile(SLA_NAMESPACE_PROPERTY + "/([^:]+:\\d+)");
+    public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s";
+    public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + 
"/%s";

Review Comment:
   **Rolling upgrade concern:** Heartbeat format changed from 
`pulsar/cluster/brokerId` to `pulsar/brokerId` and SLA from 
`sla-monitor/cluster/brokerId` to `sla-monitor/brokerId`. The V1 pattern 
matchers (`HEARTBEAT_NAMESPACE_PATTERN` with `pulsar/[^/]+/...`) are removed.
   
   During rolling upgrade, new brokers won't recognize old brokers' V1 
heartbeat namespaces. Old V1 heartbeat/SLA namespace entries in metadata could 
cause `IllegalArgumentException` when parsed as `NamespaceName` (3-part names 
now rejected).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to