This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 941a31029c PHOENIX-7787 Make CCF HAGroupStore ZK Updates backward 
compatible with existing ZK based client (#2479)
941a31029c is described below

commit 941a31029c9e695e10fb7433d8ecebd5cda254d3
Author: ritegarg <[email protected]>
AuthorDate: Fri May 29 13:54:52 2026 -0700

    PHOENIX-7787 Make CCF HAGroupStore ZK Updates backward compatible with 
existing ZK based client (#2479)
    
    * PHOENIX-7787 Make CCF HAGroupStore ZK Updates backward compatible with 
existing ZK based client
    
    * PHOENIX-7787 Look up preserved peer role by URL in buildDesiredLegacyCrr
    
    ClusterRoleRecord's ctor canonicalizes url1/url2 by lexical order, so
    existing.getRole2() returns the role for whichever URL sorts larger -
    not necessarily the peer URL. When the local peer cache is empty and we
    were trying to preserve existing peer role, we'd half the time inherit
    the local role instead, then the equality check would see a "logical
    change" and trigger a redundant CAS write.
    
    Manifests as flakes (deployment-dependent on ZK URL alphabetical order)
    in 
HAGroupStoreClientIT.testLegacyCrrSyncPreservesPreSeededRole2WhenPeerMissing
    and testLegacyCrrSyncStateOnlyChangeDoesNotRewriteLegacy under the full
    IT suite; clean 40/40 across two consecutive full-verify runs with the
    fix.
    
    Co-authored-by: Cursor <[email protected]>
    
    * PHOENIX-7787 Apply spotless formatting
    
    Pure formatting changes from `mvn spotless:apply` across the touched files;
    no behavior change.
    
    Generated-by: Cursor (Claude).
    Co-authored-by: Cursor <[email protected]>
    
    * PHOENIX-7787 Address PR review comments
    
    Co-authored-by: Cursor <[email protected]>
    
    ---------
    
    Co-authored-by: Ritesh Garg 
<[email protected]>
    Co-authored-by: Cursor <[email protected]>
---
 .../StaleClusterRoleRecordVersionException.java    |  35 ++
 .../org/apache/phoenix/jdbc/ClusterRoleRecord.java |  70 ++-
 .../apache/phoenix/jdbc/HAGroupStoreClient.java    | 288 ++++++++++-
 .../org/apache/phoenix/jdbc/PhoenixHAAdmin.java    |  94 +++-
 .../org/apache/phoenix/query/QueryServices.java    |  10 +
 .../apache/phoenix/query/QueryServicesOptions.java |  14 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  |   6 +-
 .../apache/phoenix/jdbc/HAGroupStoreClientIT.java  | 558 ++++++++++++++++++++-
 .../apache/phoenix/jdbc/ClusterRoleRecordTest.java | 104 ++++
 .../json/test_role_record_explicit_rpc.json        |  10 +
 .../json/test_role_record_explicit_zk.json         |  10 +
 .../json/test_role_record_no_registry_type.json    |   9 +
 12 files changed, 1169 insertions(+), 39 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java
new file mode 100644
index 0000000000..e15a716fa6
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+/**
+ * CAS write to the legacy {@code /phoenix/ha} CRR znode failed (BadVersion or 
NodeExists); the
+ * caller can re-read and retry if needed. Analog of
+ * {@link StaleHAGroupStoreRecordVersionException}.
+ */
+public class StaleClusterRoleRecordVersionException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  public StaleClusterRoleRecordVersionException(String msg) {
+    super(msg);
+  }
+
+  public StaleClusterRoleRecordVersionException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
index 0ba6b312d7..7e5b597fb9 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Objects;
 import java.util.Optional;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -126,31 +127,50 @@ public class ClusterRoleRecord {
   private final long version;
 
   /**
-   * To handle backward compatibility with old ClusterRoleRecords which had 
zk1 and zk2 as keys for
-   * zk urls, This constructor is only being used {@link 
ClusterRoleRecord#fromJson} when we
-   * deserialize Cluster Role Record read from ZooKeeper ZNode. If CRR is in 
old format we will read
-   * zk1 and zk2 and url1 and url2 will be null and if it is in new format zk1 
and zk2 will be null
-   * in both cases final url is being stored in url1 and url2 url will be 
stored in normalized forms
-   * which looks like zk1\\:port1,zk2\\:port2,zk3\\:port3, 
zk4\\:port4,zk5\\:port5::znode or
-   * master1\\:port1,master2\\:port2,master3\\:port3, 
master4\\:port4,master5\\:port5
+   * Convenience constructor: defaults {@code registryType} to
+   * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). Use the explicit 
overload below to write
+   * {@link RegistryType#ZK} records for the legacy {@code /phoenix/ha} znode.
    * @param haGroupName HighAvailability Group name / CRR name
-   * @param policy      Policy used by give CRR
-   * @param url1        ZK/HMaster url based on registry type for first cluster
-   * @param role1       {@link ClusterRole} which describes the current state 
of first cluster
-   * @param url2        ZK/HMaster url based on registry type for second 
cluster
-   * @param role2       {@link ClusterRole} which describes the current state 
of second cluster
-   * @param version     version of a given CRR
+   * @param policy      Policy used by the given CRR
+   * @param url1        ZK/HMaster url for the first cluster
+   * @param role1       {@link ClusterRole} describing the current state of 
the first cluster
+   * @param url2        ZK/HMaster url for the second cluster
+   * @param role2       {@link ClusterRole} describing the current state of 
the second cluster
+   * @param version     monotonic version of this CRR
+   */
+  public ClusterRoleRecord(String haGroupName, HighAvailabilityPolicy policy, 
String url1,
+    ClusterRole role1, String url2, ClusterRole role2, long version) {
+    this(haGroupName, policy, DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE, url1, 
role1, url2, role2,
+      version);
+  }
+
+  /**
+   * Canonical constructor; also the {@code @JsonCreator} entry point so the 
persisted
+   * {@code registryType} round-trips correctly. Records persisted before 
{@code registryType} was
+   * added as a JSON field pass {@code null} here and default to
+   * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). URLs are normalized 
to the
+   * {@code registryType}-specific canonical form for accurate comparisons; 
the resulting
+   * {@code url1}/{@code url2} are stored as {@code 
zk1\:port1,zk2\:port2,...::znode} for ZK or
+   * {@code master1\:port1,master2\:port2,...} for RPC/MASTER.
+   * @param haGroupName  HighAvailability Group name / CRR name
+   * @param policy       Policy used by the given CRR
+   * @param registryType {@link RegistryType} for URL normalization; {@code 
null} defaults to
+   *                     {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC)
+   * @param url1         ZK/HMaster url for the first cluster
+   * @param role1        {@link ClusterRole} describing the current state of 
the first cluster
+   * @param url2         ZK/HMaster url for the second cluster
+   * @param role2        {@link ClusterRole} describing the current state of 
the second cluster
+   * @param version      monotonic version of this CRR
    */
   @JsonCreator
   public ClusterRoleRecord(@JsonProperty("haGroupName") String haGroupName,
-    @JsonProperty("policy") HighAvailabilityPolicy policy, 
@JsonProperty("url1") String url1,
+    @JsonProperty("policy") HighAvailabilityPolicy policy,
+    @JsonProperty("registryType") RegistryType registryType, 
@JsonProperty("url1") String url1,
     @JsonProperty("role1") ClusterRole role1, @JsonProperty("url2") String 
url2,
     @JsonProperty("role2") ClusterRole role2, @JsonProperty("version") long 
version) {
     this.haGroupName = haGroupName;
     this.policy = policy;
-
-    // Default registry type is RPC from Consistent Cluster Failover onwards
-    this.registryType = DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE;
+    this.registryType = registryType != null ? registryType : 
DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE;
 
     // Do we really need to normalize here ?
     // We are normalizing to have urls in specific formats for each 
registryType for getting
@@ -221,6 +241,22 @@ public class ClusterRoleRecord {
     return haGroupName.equals(other.haGroupName) && 
policy.equals(other.policy);
   }
 
+  /**
+   * Equality on the six identity/role fields ({@code haGroupName, policy, 
url1, url2, role1,
+   * role2}); ignores {@code version} (always bumps) and the {@code 
registryType} field itself.
+   * Same-registry callers only: {@code url1}/{@code url2} are normalized at 
construction per
+   * {@code registryType}, so cross-registry records will not compare equal 
even for the same
+   * underlying host:port. Returns {@code false} if {@code other} is {@code 
null}.
+   */
+  public boolean isLogicallyEqualIgnoringVersionAndRegistry(ClusterRoleRecord 
other) {
+    if (other == null) {
+      return false;
+    }
+    return Objects.equals(haGroupName, other.haGroupName) && 
Objects.equals(policy, other.policy)
+      && Objects.equals(url1, other.url1) && Objects.equals(url2, other.url2)
+      && role1 == other.role1 && role2 == other.role2;
+  }
+
   /** Returns true if CRR has any url in UNKNOWN role/state. */
   public boolean hasUnknownRole() {
     return role1 == ClusterRole.UNKNOWN || role2 == ClusterRole.UNKNOWN;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index fe299d14fb..9d2c268eae 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.jdbc;
 
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
 import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_1;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_2;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_1;
@@ -34,7 +35,11 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ZK_URL_2;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
 import static 
org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_SYNC_INTERVAL_SECONDS;
+import static 
org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS;
+import static 
org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
 
@@ -56,17 +61,20 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
+import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException;
 import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException;
 import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
 import org.apache.phoenix.jdbc.ClusterRoleRecord.RegistryType;
@@ -80,6 +88,7 @@ import org.slf4j.LoggerFactory;
 
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * Main implementation of HAGroupStoreClient with peer support. Write-through 
cache for HAGroupStore
@@ -98,8 +107,13 @@ public class HAGroupStoreClient implements Closeable {
   public static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1;
   // Maximum jitter in seconds for sync job start time (10 seconds)
   private static final long SYNC_JOB_MAX_JITTER_SECONDS = 10;
+  // Exclusive upper bound for initial-delay jitter on the periodic reconciler 
(0..30s).
+  private static final long LEGACY_CRR_SYNC_JOB_MAX_JITTER_SECONDS = 31;
   private PhoenixHAAdmin phoenixHaAdmin;
   private PhoenixHAAdmin peerPhoenixHaAdmin;
+  // Admin + NodeCache on /phoenix/ha; null when feature disabled.
+  private volatile PhoenixHAAdmin legacyHaAdmin;
+  private volatile NodeCache legacyCrrNodeCache;
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HAGroupStoreClient.class);
   // Map of <ZKUrl, <HAGroupName, HAGroupStoreClientInstance>>
   private static final Map<String, ConcurrentHashMap<String, 
HAGroupStoreClient>> instances =
@@ -132,6 +146,12 @@ public class HAGroupStoreClient implements Closeable {
     CopyOnWriteArraySet<HAGroupStateListener>> targetStateSubscribers = new 
ConcurrentHashMap<>();
   // Scheduled executor for periodic sync job
   private ScheduledExecutorService syncExecutor;
+  // Legacy CRR sync state. All invocations of syncLegacyCRRIfRoleChanged go 
through the
+  // single-threaded legacyCrrSyncExecutor (initial sync, periodic, and 
event-driven), so
+  // calls are naturally serialized; close races are handled via local 
snapshots of mutable
+  // fields inside the method. No additional lock is needed.
+  private final boolean legacyCrrSyncEnabled;
+  private volatile ScheduledExecutorService legacyCrrSyncExecutor;
 
   public static HAGroupStoreClient getInstance(Configuration conf, String 
haGroupName)
     throws SQLException {
@@ -164,8 +184,15 @@ public class HAGroupStoreClient implements Closeable {
             result.close();
             result = null;
           } else {
-            instances.putIfAbsent(localZkUrl, new ConcurrentHashMap<>());
-            instances.get(localZkUrl).put(haGroupName, result);
+            // Atomic register; pairs with deregisterFromInstances()'s 
computeIfPresent.
+            final HAGroupStoreClient created = result;
+            instances.compute(localZkUrl, (k, v) -> {
+              if (v == null) {
+                v = new ConcurrentHashMap<>();
+              }
+              v.put(haGroupName, created);
+              return v;
+            });
           }
         }
       }
@@ -224,6 +251,8 @@ public class HAGroupStoreClient implements Closeable {
       conf.getLong(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT) * 
ZK_SESSION_TIMEOUT_MULTIPLIER);
     this.rotationTimeMs = 
conf.getLong(QueryServices.REPLICATION_LOG_ROTATION_TIME_MS_KEY,
       QueryServicesOptions.DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
+    this.legacyCrrSyncEnabled = 
conf.getBoolean(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED,
+      DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED);
     // Custom Event Listener
     this.peerCustomPathChildrenCacheListener = peerPathChildrenCacheListener;
     try {
@@ -246,6 +275,12 @@ public class HAGroupStoreClient implements Closeable {
       // Start periodic sync job
       startPeriodicSyncJob();
 
+      // Opt-in legacy /phoenix/ha sync. Setup failures propagate so the 
client is marked
+      // unhealthy rather than silently dropping the legacy znode out of sync.
+      if (legacyCrrSyncEnabled && this.isHealthy) {
+        setupLegacyCrrSync();
+      }
+
     } catch (Exception e) {
       this.isHealthy = false;
       close();
@@ -886,9 +921,22 @@ public class HAGroupStoreClient implements Closeable {
             if (cacheType == ClusterType.LOCAL) {
               maybeInitializePeerPathChildrenCache();
             }
+            // Offload the legacy CRR sync (it does ZK + JDBC I/O) so we don't 
block
+            // Curator's per-namespace event dispatcher.
+            ScheduledExecutorService syncExec = legacyCrrSyncExecutor;
+            if (syncExec != null) {
+              try {
+                syncExec.execute(this::syncLegacyCRRIfRoleChanged);
+              } catch (RejectedExecutionException ree) {
+                // Executor already shutting down (close() race); drop 
silently.
+                LOGGER.debug("Legacy CRR sync skipped for HA group {}: 
executor shut down",
+                  haGroupName);
+              }
+            }
           }
           break;
         case CHILD_REMOVED:
+          // No-op: the legacy /phoenix/ha znode is never deleted by this 
client.
           break;
         case INITIALIZED:
           latch.countDown();
@@ -981,34 +1029,66 @@ public class HAGroupStoreClient implements Closeable {
   }
 
   /**
-   * Shuts down the periodic sync executor gracefully.
+   * Remove this instance from the static {@link #instances} map. Idempotent. 
Uses value-based
+   * remove so that, if a concurrent {@link #getInstanceForZkUrl} has already 
swapped in a fresh
+   * replacement, the replacement is preserved. The outer-CHM {@code 
computeIfPresent} below pairs
+   * with the {@code compute} in {@link #getInstanceForZkUrl} to serialize 
bucket creation/removal
+   * on the same key.
    */
+  private void deregisterFromInstances() {
+    final String key = (this.zkUrl != null) ? this.zkUrl : 
getLocalZkUrl(this.conf);
+    if (key == null) {
+      return;
+    }
+    final ConcurrentHashMap<String, HAGroupStoreClient> bucket = 
instances.get(key);
+    if (bucket == null) {
+      return;
+    }
+    bucket.remove(this.haGroupName, this);
+    instances.computeIfPresent(key, (k, v) -> v.isEmpty() ? null : v);
+  }
+
   private void shutdownSyncExecutor() {
     if (syncExecutor != null) {
-      syncExecutor.shutdown();
-      try {
-        if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-          syncExecutor.shutdownNow();
-        }
-      } catch (InterruptedException e) {
-        syncExecutor.shutdownNow();
-        Thread.currentThread().interrupt();
-      }
+      MoreExecutors.shutdownAndAwaitTermination(syncExecutor, 5, 
TimeUnit.SECONDS);
       syncExecutor = null;
     }
   }
 
+  private void shutdownLegacyCrrSyncExecutor() {
+    if (legacyCrrSyncExecutor != null) {
+      MoreExecutors.shutdownAndAwaitTermination(legacyCrrSyncExecutor, 5, 
TimeUnit.SECONDS);
+      legacyCrrSyncExecutor = null;
+    }
+  }
+
   @Override
   public void close() {
     try {
       LOGGER.info("Closing HAGroupStoreClient");
-      // Shutdown sync executor
+      // Mark unhealthy and deregister from the static cache first so any 
concurrent
+      // getInstanceForZkUrl() does not hand out a half-closed instance.
+      isHealthy = false;
+      deregisterFromInstances();
+      // Executors -> caches -> admins. Null-before-close on legacy resources 
so a racing
+      // listener sees either a live or null reference, never half-closed.
       shutdownSyncExecutor();
+      shutdownLegacyCrrSyncExecutor();
       if (pathChildrenCache != null) {
         pathChildrenCache.close();
         pathChildrenCache = null;
       }
       closePeerConnection();
+      NodeCache nodeCache = this.legacyCrrNodeCache;
+      this.legacyCrrNodeCache = null;
+      if (nodeCache != null) {
+        nodeCache.close();
+      }
+      PhoenixHAAdmin admin = this.legacyHaAdmin;
+      this.legacyHaAdmin = null;
+      if (admin != null) {
+        admin.close();
+      }
       LOGGER.info("Closed HAGroupStoreClient");
     } catch (IOException e) {
       LOGGER.error("Exception closing HAGroupStoreClient", e);
@@ -1047,6 +1127,188 @@ public class HAGroupStoreClient implements Closeable {
     return Math.max(0, remainingTime);
   }
 
+  // ========== Legacy /phoenix/ha CRR Sync ==========
+
+  /**
+   * Derives the combined CRR from local + peer records and CAS-writes it to 
{@code /phoenix/ha}.
+   * CAS losses are logged and skipped; the next consistentHA cache event or 
periodic cycle
+   * reconverges.
+   */
+  private void syncLegacyCRRIfRoleChanged() {
+    if (!legacyCrrSyncEnabled || !isHealthy) {
+      return;
+    }
+    // Snapshot mutable resources up front so a concurrent close() can't null 
them mid-method
+    // and trigger NPEs / writes through a torn-down Curator client.
+    final PhoenixHAAdmin admin = this.legacyHaAdmin;
+    final NodeCache cache = this.legacyCrrNodeCache;
+    if (admin == null || cache == null) {
+      return;
+    }
+    try {
+      HAGroupStoreRecord local = getHAGroupStoreRecord();
+      if (local == null) {
+        LOGGER.debug("Skipping legacy CRR sync for HA group {}: no local 
consistentHA record",
+          haGroupName);
+        return;
+      }
+      // Wait for peer URL before building the desired CRR (ctor NPEs on null 
url2).
+      if (StringUtils.isBlank(local.getPeerZKUrl())) {
+        LOGGER.debug("Skipping legacy CRR sync for HA group {}: peer ZK URL is 
blank", haGroupName);
+        return;
+      }
+      HAGroupStoreRecord peer = getHAGroupStoreRecordFromPeer();
+      // NodeCache is eventually consistent; on apparent absence, fall back to 
an authoritative
+      // ZK read so the equality check and CAS both see consistent state.
+      Pair<ClusterRoleRecord, Stat> snapshot = readLegacyCrrSnapshot(cache);
+      if (snapshot.getRight() == null) {
+        snapshot = admin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+      }
+      ClusterRoleRecord existing = snapshot.getLeft();
+      Stat existingStat = snapshot.getRight();
+      if (!shouldWriteLegacyCrr(existing)) {
+        return;
+      }
+      ClusterRoleRecord desired = buildDesiredLegacyCrr(local, peer, existing);
+      if (desired.isLogicallyEqualIgnoringVersionAndRegistry(existing)) {
+        LOGGER.debug("Legacy CRR for HA group {} already up to date at version 
{}", haGroupName,
+          existing.getVersion());
+        return;
+      }
+      try {
+        if (existingStat == null) {
+          admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired,
+            PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, /* ignored */ 0);
+        } else {
+          admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired,
+            PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, 
existingStat.getVersion());
+        }
+        LOGGER.info("Synced legacy CRR for HA group {} (version {} -> {})", 
haGroupName,
+          existing != null ? existing.getVersion() : -1L, 
desired.getVersion());
+      } catch (StaleClusterRoleRecordVersionException stale) {
+        // CAS lost (another RS won); next event/periodic cycle reconverges. 
DEBUG: N-1 RSes
+        // log this per transition.
+        LOGGER.debug("Legacy CRR CAS lost for HA group {} at expected stat 
version {}", haGroupName,
+          existingStat != null ? existingStat.getVersion() : -1);
+      }
+    } catch (Exception e) {
+      LOGGER.warn(
+        "Legacy CRR sync failed for HA group {}; will be retried by next 
event/periodic cycle",
+        haGroupName, e);
+    }
+  }
+
+  /**
+   * Policy gate before issuing a CAS write to the legacy CRR. Returns {@code 
false} when the
+   * existing record must not be overwritten by this client.
+   */
+  private boolean shouldWriteLegacyCrr(ClusterRoleRecord existing) {
+    // Refuse to overwrite a non-ZK (admin-managed RPC) legacy CRR; live 
readers use its
+    // registryType to build connection strings, so swapping form would break 
them.
+    if (existing != null && existing.getRegistryType() != RegistryType.ZK) {
+      LOGGER.warn("Skipping legacy CRR sync for HA group {}: existing 
registryType={} "
+        + "(requires admin migration to ZK form)", haGroupName, 
existing.getRegistryType());
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Builds the desired legacy CRR, always stamped {@link RegistryType#ZK}. 
When the local client's
+   * peer view is unavailable, preserves the {@code existing} record's peer 
role rather than
+   * downgrading it to {@link ClusterRole#UNKNOWN} — another RS with peer 
visibility would otherwise
+   * keep flipping it back, and the equality check naturally short-circuits 
when no other field
+   * changed.
+   * <p>
+   * Look up the preserved role by the peer URL (not by {@code getRole2()}) 
since
+   * {@link ClusterRoleRecord} canonicalizes {@code url1}/{@code url2} by 
alphabetical order; the
+   * peer URL may end up in either slot depending on lexical comparison.
+   */
+  private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local, 
HAGroupStoreRecord peer,
+    ClusterRoleRecord existing) {
+    final ClusterRole role2;
+    if (peer != null) {
+      role2 = peer.getClusterRole();
+    } else if (existing != null) {
+      role2 = existing.getRole(JDBCUtil.formatUrl(local.getPeerZKUrl(), 
RegistryType.ZK));
+    } else {
+      role2 = ClusterRole.UNKNOWN;
+    }
+    long peerAdminVersion = (peer != null) ? peer.getAdminCRRVersion() : 0L;
+    long baseVersion = Math.max(existing != null ? existing.getVersion() : 0L,
+      Math.max(local.getAdminCRRVersion(), peerAdminVersion));
+    return new ClusterRoleRecord(haGroupName, 
HighAvailabilityPolicy.valueOf(local.getPolicy()),
+      RegistryType.ZK, this.zkUrl, local.getClusterRole(), 
local.getPeerZKUrl(), role2,
+      baseVersion + 1);
+  }
+
+  /**
+   * NodeCache snapshot of the legacy CRR. {@code (null, null)} on cache miss; 
callers must confirm
+   * absence with an authoritative ZK read since the cache is eventually 
consistent. Caller passes
+   * the cache from a local snapshot, never the mutable field (which {@link 
#close()} may null at
+   * any time).
+   */
+  private Pair<ClusterRoleRecord, Stat> readLegacyCrrSnapshot(NodeCache cache) 
{
+    ChildData current = cache.getCurrentData();
+    if (current == null) {
+      return Pair.of(null, null);
+    }
+    ClusterRoleRecord record = 
ClusterRoleRecord.fromJson(current.getData()).orElse(null);
+    return Pair.of(record, current.getStat());
+  }
+
+  /**
+   * Initialize legacy {@code /phoenix/ha} sync: admin handle, NodeCache, 
single-thread executor, an
+   * off-lock initial sync, and the periodic reconciler. Called only when the 
feature is enabled and
+   * the client is healthy.
+   */
+  private void setupLegacyCrrSync() throws Exception {
+    this.legacyHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, 
PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE);
+    this.legacyCrrNodeCache = new NodeCache(this.legacyHaAdmin.getCurator(), 
toPath(haGroupName));
+    // Async start; rebuild() below warms the cache off-lock.
+    this.legacyCrrNodeCache.start(false);
+    this.legacyCrrSyncExecutor = Executors.newSingleThreadScheduledExecutor(r 
-> {
+      Thread t = new Thread(r, "HAGroupStoreClient-LegacyCRRSync-" + 
haGroupName);
+      t.setDaemon(true);
+      return t;
+    });
+    // Warm NodeCache and run initial sync off-thread so neither blocks the 
static
+    // HAGroupStoreClient.class monitor held by getInstanceForZkUrl on ZK/JDBC 
I/O. The sync's
+    // empty-snapshot fallback handles the race where it runs before rebuild() 
lands.
+    final NodeCache cacheRef = this.legacyCrrNodeCache;
+    this.legacyCrrSyncExecutor.execute(() -> {
+      try {
+        cacheRef.rebuild();
+      } catch (Exception e) {
+        LOGGER.debug("Legacy CRR cache rebuild failed for HA group {}", 
haGroupName, e);
+      }
+    });
+    this.legacyCrrSyncExecutor.execute(this::syncLegacyCRRIfRoleChanged);
+    startLegacyCrrReconciliation();
+  }
+
+  /** Schedules the periodic reconciler; no-op when {@code intervalSec <= 0}. 
*/
+  private void startLegacyCrrReconciliation() {
+    long intervalSec = 
conf.getLong(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS,
+      DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS);
+    if (intervalSec <= 0) {
+      LOGGER.info("Legacy CRR periodic reconciliation disabled (interval={}s) 
for HA group {}",
+        intervalSec, haGroupName);
+      return;
+    }
+    long jitterSec =
+      ThreadLocalRandom.current().nextLong(0, 
LEGACY_CRR_SYNC_JOB_MAX_JITTER_SECONDS);
+    LOGGER.info("Starting legacy CRR reconciliation for HA group {} with 
initial delay {}s, "
+      + "then every {}s", haGroupName, jitterSec, intervalSec);
+    legacyCrrSyncExecutor.scheduleAtFixedRate(() -> {
+      try {
+        syncLegacyCRRIfRoleChanged();
+      } catch (Throwable t) {
+        LOGGER.warn("Periodic legacy CRR reconciliation failed for HA group 
{}", haGroupName, t);
+      }
+    }, jitterSec, intervalSec, TimeUnit.SECONDS);
+  }
+
   // ========== HA Group State Change Subscription Methods ==========
 
   /**
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java
index f0fa718952..0eaa35e163 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java
@@ -39,6 +39,7 @@ import org.apache.curator.utils.ZKPaths;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException;
 import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException;
 import org.apache.phoenix.util.JDBCUtil;
 import org.apache.zookeeper.CreateMode;
@@ -83,6 +84,25 @@ public class PhoenixHAAdmin implements Closeable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixHAAdmin.class);
 
+  /** ZK's wildcard for {@code setData()/delete()}: bypasses version check. */
+  static final int ZK_MATCH_ANY_VERSION = -1;
+
+  /**
+   * Write mode for {@link #createOrUpdateClusterRoleRecordWithCAS}. The 
accompanying
+   * {@code expectedStatVersion} argument is interpreted only for {@link 
#CAS_WITH_VERSION}.
+   * <p>
+   * Phoenix-internal. Not part of any public API contract — values may be 
added or renamed without
+   * notice. Do not switch exhaustively from outside this module.
+   */
+  public enum LegacyCrrWriteMode {
+    /** Create the znode; no prior version expected. */
+    CREATE_NEW,
+    /** Unconditional overwrite (no CAS). For operator/migration tooling only. 
*/
+    FORCE_OVERWRITE,
+    /** CAS update; {@code expectedStatVersion} must be {@code >= 0}. */
+    CAS_WITH_VERSION
+  }
+
   /** The fully qualified ZK URL for an HBase cluster in format 
host:port:/hbase */
   private final String zkUrl;
   /** Configuration of this command line tool. */
@@ -524,17 +544,18 @@ public class PhoenixHAAdmin implements Closeable {
   }
 
   /**
-   * Gets the HAGroupStoreRecord and Stat from ZooKeeper.
+   * Gets the HAGroupStoreRecord and Stat from ZooKeeper. Reads (record, stat) 
atomically via
+   * {@code storingStatIn} so the returned stat version always corresponds to 
the returned bytes.
    * @param haGroupName the HA group name
-   * @return a pair of HAGroupStoreRecord and Stat
+   * @return a pair of HAGroupStoreRecord and Stat; both {@code null} if the 
znode does not exist
    * @throws IOException if any error occurs during the retrieval
    */
   public Pair<HAGroupStoreRecord, Stat> 
getHAGroupStoreRecordInZooKeeper(String haGroupName)
     throws IOException {
     try {
-      byte[] data = getCurator().getData().forPath(toPath(haGroupName));
+      Stat stat = new Stat();
+      byte[] data = 
getCurator().getData().storingStatIn(stat).forPath(toPath(haGroupName));
       HAGroupStoreRecord record = 
HAGroupStoreRecord.fromJson(data).orElse(null);
-      Stat stat = getCurator().checkExists().forPath(toPath(haGroupName));
       return Pair.of(record, stat);
     } catch (KeeperException.NoNodeException nne) {
       LOG.warn("No HAGroupStoreRecord for HA group {} in ZK", haGroupName, 
nne);
@@ -559,6 +580,71 @@ public class PhoenixHAAdmin implements Closeable {
     }
   }
 
+  // ----- Legacy /phoenix/ha ClusterRoleRecord sync helpers -----
+
+  /**
+   * Atomic read of (record, stat) on the legacy CRR znode. Returns {@code 
(null, null)} if the
+   * znode does not exist.
+   */
+  public Pair<ClusterRoleRecord, Stat> 
getClusterRoleRecordAndStatInZooKeeper(String haGroupName)
+    throws IOException {
+    try {
+      Stat stat = new Stat();
+      byte[] data = 
getCurator().getData().storingStatIn(stat).forPath(toPath(haGroupName));
+      ClusterRoleRecord record = ClusterRoleRecord.fromJson(data).orElse(null);
+      return Pair.of(record, stat);
+    } catch (KeeperException.NoNodeException nne) {
+      return Pair.of(null, null);
+    } catch (Exception e) {
+      LOG.error("Failed to get ClusterRoleRecord for HA group {}", 
haGroupName, e);
+      throw new IOException("Failed to get ClusterRoleRecord for HA group " + 
haGroupName, e);
+    }
+  }
+
+  /**
+   * Writes {@code newRecord} per {@code mode}. {@code expectedStatVersion} is 
used only for
+   * {@link LegacyCrrWriteMode#CAS_WITH_VERSION} (must be {@code >= 0}). Both 
BadVersion and
+   * NodeExists surface as {@link StaleClusterRoleRecordVersionException}.
+   */
+  public void createOrUpdateClusterRoleRecordWithCAS(String haGroupName,
+    ClusterRoleRecord newRecord, LegacyCrrWriteMode mode, int 
expectedStatVersion)
+    throws IOException, StaleClusterRoleRecordVersionException {
+    Preconditions.checkNotNull(mode, "mode");
+    if (mode == LegacyCrrWriteMode.CAS_WITH_VERSION) {
+      Preconditions.checkArgument(expectedStatVersion >= 0,
+        "CAS_WITH_VERSION requires expectedStatVersion >= 0; got " + 
expectedStatVersion);
+    }
+    try {
+      byte[] data = ClusterRoleRecord.toJson(newRecord);
+      switch (mode) {
+        case CREATE_NEW:
+          
getCurator().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+            .forPath(toPath(haGroupName), data);
+          break;
+        case FORCE_OVERWRITE:
+          
getCurator().setData().withVersion(ZK_MATCH_ANY_VERSION).forPath(toPath(haGroupName),
+            data);
+          break;
+        case CAS_WITH_VERSION:
+          
getCurator().setData().withVersion(expectedStatVersion).forPath(toPath(haGroupName),
+            data);
+          break;
+        default:
+          throw new IllegalStateException("Unhandled LegacyCrrWriteMode: " + 
mode);
+      }
+    } catch (KeeperException.BadVersionException e) {
+      throw new StaleClusterRoleRecordVersionException(
+        "CAS failed for HA group " + haGroupName + " at expectedStatVersion " 
+ expectedStatVersion,
+        e);
+    } catch (KeeperException.NodeExistsException e) {
+      throw new StaleClusterRoleRecordVersionException(
+        "Create failed for HA group " + haGroupName + ": node already exists", 
e);
+    } catch (Exception e) {
+      LOG.error("Failed to write ClusterRoleRecord for HA group {}", 
haGroupName, e);
+      throw new IOException("Failed to write ClusterRoleRecord for HA group " 
+ haGroupName, e);
+    }
+  }
+
   public String getZkUrl() {
     return zkUrl;
   }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 72075853aa..42514d7329 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -647,6 +647,16 @@ public interface QueryServices extends SQLCloseable {
   // HA Group Store sync job interval in seconds
   String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 
"phoenix.ha.group.store.sync.interval.seconds";
 
+  // "CRR" = Cluster Role Record. Master switch for syncing the legacy 
/phoenix/ha cluster
+  // role record from /phoenix/consistentHA. When false, no legacy znode is 
read, written, or
+  // deleted by HAGroupStoreClient.
+  String PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED = 
"phoenix.ha.legacy.crr.sync.enabled";
+
+  // Periodic reconciliation interval for the legacy /phoenix/ha cluster role 
record sync, in
+  // seconds. 0 disables the periodic loop only; event-driven sync still runs.
+  String PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS =
+    "phoenix.ha.legacy.crr.reconciliation.interval.seconds";
+
   String REPLICATION_LOG_ROTATION_TIME_MS_KEY = 
"phoenix.replication.log.rotation.time.ms";
 
   /**
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 339b6763a4..3e99b9fff6 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -92,6 +92,8 @@ import static 
org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_AT
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
 import static org.apache.phoenix.query.QueryServices.PHOENIX_ACLS_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS;
+import static 
org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED;
 import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
 import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB;
@@ -517,6 +519,13 @@ public class QueryServicesOptions {
   // Default HA Group Store sync job interval in seconds (15 minutes = 900 
seconds)
   public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900;
 
+  // Legacy /phoenix/ha CRR sync is opt-in (default off).
+  public static final boolean DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED = 
false;
+
+  // Periodic reconciliation interval for legacy /phoenix/ha CRR sync, in 
seconds.
+  // 0 disables the periodic loop only; event-driven sync still runs.
+  public static final long 
DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS = 60L;
+
   public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 
1000L;
 
   private final Configuration config;
@@ -643,7 +652,10 @@ public class QueryServicesOptions {
       .setIfUnset(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS,
         DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS)
       .setIfUnset(HA_GROUP_STORE_CLIENT_PREWARM_ENABLED,
-        DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED);
+        DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED)
+      .setIfUnset(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, 
DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED)
+      .setIfUnset(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS,
+        DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS);
 
     // HBase sets this to 1, so we reset it to something more appropriate.
     // Hopefully HBase will change this, because we can't know if a user set
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index edb38da970..1f57187a2c 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -4738,9 +4738,9 @@ public class MetaDataClient {
           /**
            * To check if TTL is defined at any of the child below we are 
checking it at
            * {@link 
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List, 
ColumnMutator, int, PTable, PTable, boolean)}
-           * level where in function
-           * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# 
validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], 
byte[], byte[], List, int)}
-           * we are already traversing through allDescendantViews.
+           * level where in function {@link 
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
+           * validateIfMutationAllowedOnParent(PTable, List, PTableType, long, 
byte[], byte[],
+           * byte[], List, int)} we are already traversing through 
allDescendantViews.
            */
         }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index ba66787674..8997fc2bc4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -18,9 +18,12 @@
 package org.apache.phoenix.jdbc;
 
 import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static 
org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS;
+import static 
org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED;
 import static 
org.apache.phoenix.replication.reader.ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
@@ -34,6 +37,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -47,13 +51,16 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
+import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException;
 import org.apache.phoenix.util.HAGroupStoreTestUtil;
+import org.apache.phoenix.util.JDBCUtil;
 import org.apache.zookeeper.data.Stat;
 import org.junit.After;
 import org.junit.Before;
@@ -72,6 +79,8 @@ public class HAGroupStoreClientIT extends HABaseIT {
   private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 5000L;
   private PhoenixHAAdmin haAdmin;
   private PhoenixHAAdmin peerHaAdmin;
+  // Admin on the legacy /phoenix/ha namespace; used to inspect/seed/corrupt 
the legacy znode.
+  private PhoenixHAAdmin legacyHaAdmin;
   private String zkUrl;
   private String peerZKUrl;
   private String masterUrl;
@@ -95,8 +104,11 @@ public class HAGroupStoreClientIT extends HABaseIT {
       ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE);
     peerHaAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
       ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE);
+    legacyHaAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(),
+      PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE);
     
haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
     
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
+    
legacyHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
     zkUrl = getLocalZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration());
     // Clean existing records in system table
     List<String> haGroupNames = HAGroupStoreClient.getHAGroupNames(zkUrl);
@@ -117,8 +129,10 @@ public class HAGroupStoreClientIT extends HABaseIT {
   public void after() throws Exception {
     
haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
     
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
+    
legacyHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
     haAdmin.close();
     peerHaAdmin.close();
+    legacyHaAdmin.close();
   }
 
   @Test
@@ -752,7 +766,6 @@ public class HAGroupStoreClientIT extends HABaseIT {
   @Test
   public void testSetHAGroupStatusIfNeededWithUnhealthyClient() throws 
Exception {
     String haGroupName = testName.getMethodName();
-
     HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient
       .getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(), 
haGroupName, zkUrl);
 
@@ -1203,4 +1216,547 @@ public class HAGroupStoreClientIT extends HABaseIT {
       assertTrue("Sync executor should be shutdown after close", 
syncExecutor.isShutdown());
     }
   }
+
+  // 
============================================================================================
+  // Legacy /phoenix/ha CRR sync tests
+  // Verify feature-flag gating, derivation, monotonic version, registry-type 
preservation,
+  // deletion mirroring, and short-circuit behavior of HAGroupStoreClient's 
legacy sync path.
+  // 
============================================================================================
+
+  @Test
+  public void testLegacyCrrSyncFeatureOffByDefault_NoLegacyZnodeWritten() 
throws Exception {
+    String haGroupName = testName.getMethodName();
+    Configuration conf = legacyCrrConf(/* legacyEnabled */ false, /* 
periodicSec */ 60);
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+    Pair<ClusterRoleRecord, Stat> legacy = readLegacyCrr(haGroupName);
+    assertNull("Legacy CRR must not exist when feature is off", 
legacy.getLeft());
+    assertNull("Legacy znode stat must be null when feature is off", 
legacy.getRight());
+  }
+
+  @Test
+  public void 
testLegacyCrrSyncFeatureOn_InitialSyncCreatesZkRegistryLegacyZnode()
+    throws Exception {
+    String haGroupName = testName.getMethodName();
+    Configuration conf = legacyCrrConf(true, 60);
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+
+    Pair<ClusterRoleRecord, Stat> legacy = awaitLegacyCrrPresent(haGroupName);
+    ClusterRoleRecord crr = legacy.getLeft();
+    assertEquals("Legacy CRR must use ZK registry type for backward 
compatibility",
+      ClusterRoleRecord.RegistryType.ZK, crr.getRegistryType());
+    assertEquals(haGroupName, crr.getHaGroupName());
+    assertEquals(HighAvailabilityPolicy.FAILOVER, crr.getPolicy());
+    // Local cluster role is ACTIVE per System Table seed.
+    assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE,
+      crr.getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+    assertTrue("CRR version must be > 0 after initial sync", crr.getVersion() 
> 0);
+  }
+
+  @Test
+  public void testLegacyCrrSyncRoleChangePropagatesAndIsNewerThanWorks() 
throws Exception {
+    String haGroupName = testName.getMethodName();
+    Configuration conf = legacyCrrConf(true, 60);
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+
+    ClusterRoleRecord initial = awaitLegacyCrrPresent(haGroupName).getLeft();
+    long initialVersion = initial.getVersion();
+
+    // ACTIVE_IN_SYNC -> ACTIVE_IN_SYNC_TO_STANDBY (role change ACTIVE -> 
ACTIVE_TO_STANDBY).
+    
client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
+    Pair<ClusterRoleRecord, Stat> updated = awaitLegacyCrrRole(haGroupName, 
ClusterType.LOCAL,
+      ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY);
+    ClusterRoleRecord updatedCRR = updated.getLeft();
+    assertTrue("Version must monotonically increase after role change",
+      updatedCRR.getVersion() > initialVersion);
+    assertTrue("isNewerThan must return true for the updated record",
+      updatedCRR.isNewerThan(initial));
+    assertEquals(ClusterRoleRecord.RegistryType.ZK, 
updatedCRR.getRegistryType());
+  }
+
+  @Test
+  public void testLegacyCrrSyncStateOnlyChangeDoesNotRewriteLegacy() throws 
Exception {
+    String haGroupName = testName.getMethodName();
+    Configuration conf = legacyCrrConf(true, 60);
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+
+    Pair<ClusterRoleRecord, Stat> initial = awaitLegacyCrrPresent(haGroupName);
+    int initialZkVersion = initial.getRight().getVersion();
+    long initialCrrVersion = initial.getLeft().getVersion();
+
+    // ACTIVE_IN_SYNC -> ACTIVE_NOT_IN_SYNC: ClusterRole stays ACTIVE.
+    
client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    Pair<ClusterRoleRecord, Stat> after = readLegacyCrr(haGroupName);
+    assertNotNull(after.getLeft());
+    assertEquals("Legacy CRR ZK stat version must not change on state-only 
transitions",
+      initialZkVersion, after.getRight().getVersion());
+    assertEquals("Legacy CRR logical version must not change on state-only 
transitions",
+      initialCrrVersion, after.getLeft().getVersion());
+  }
+
+  /** LOCAL CHILD_REMOVED on consistentHA does not delete the legacy znode. */
+  @Test
+  public void testLegacyCrrSyncLocalChildRemovedDoesNotDeleteLegacy() throws 
Exception {
+    String haGroupName = testName.getMethodName();
+    Configuration conf = legacyCrrConf(true, 60);
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+    Pair<ClusterRoleRecord, Stat> initial = awaitLegacyCrrPresent(haGroupName);
+    long initialVersion = initial.getLeft().getVersion();
+
+    haAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
+
+    // Wait long enough for any potential event-driven delete to have fired.
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    Pair<ClusterRoleRecord, Stat> after = readLegacyCrr(haGroupName);
+    assertNotNull("Legacy znode must NOT be deleted on LOCAL CHILD_REMOVED", 
after.getLeft());
+    assertTrue("Legacy CRR version must not regress after LOCAL CHILD_REMOVED",
+      after.getLeft().getVersion() >= initialVersion);
+  }
+
+  @Test
+  public void testLegacyCrrSyncPeriodicDisabledStillSyncsViaEvents() throws 
Exception {
+    String haGroupName = testName.getMethodName();
+    Configuration conf = legacyCrrConf(true, 0); // periodic disabled
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+    ClusterRoleRecord initial = awaitLegacyCrrPresent(haGroupName).getLeft();
+    assertEquals("Initial local role should be ACTIVE per @Before seed",
+      ClusterRoleRecord.ClusterRole.ACTIVE, 
initial.getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+    assertEquals("Initial registry type must be ZK", 
ClusterRoleRecord.RegistryType.ZK,
+      initial.getRegistryType());
+
+    
client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
+    ClusterRoleRecord updated = awaitLegacyCrrRole(haGroupName, 
ClusterType.LOCAL,
+      ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).getLeft();
+    assertTrue("Updated version must monotonically advance past the initial 
version",
+      updated.getVersion() > initial.getVersion());
+    assertTrue("isNewerThan must return true for the post-event record",
+      updated.isNewerThan(initial));
+    assertEquals("Registry type must remain ZK after an event-driven sync",
+      ClusterRoleRecord.RegistryType.ZK, updated.getRegistryType());
+  }
+
+  /** PEER CHILD_REMOVED on consistentHA does not delete the legacy znode. */
+  @Test
+  public void testLegacyCrrSyncPeerChildRemovedDoesNotDeleteLegacy() throws 
Exception {
+    String haGroupName = testName.getMethodName();
+    // Seed a peer record so that PEER cache initializes and PEER 
CHILD_REMOVED can fire later.
+    HAGroupStoreRecord peerRecord =
+      new HAGroupStoreRecord("v1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
+        HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, 
this.peerMasterUrl, this.masterUrl,
+        CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L);
+    createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, 
peerRecord);
+
+    Configuration conf = legacyCrrConf(true, 0); // periodic disabled to 
isolate event behavior
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+
+    Pair<ClusterRoleRecord, Stat> initial = awaitLegacyCrrPresent(haGroupName);
+    long initialCrrVersion = initial.getLeft().getVersion();
+
+    peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    Pair<ClusterRoleRecord, Stat> after = readLegacyCrr(haGroupName);
+    assertNotNull("Legacy znode must NOT be deleted on PEER CHILD_REMOVED", 
after.getLeft());
+    assertTrue("Legacy CRR version must not regress after PEER CHILD_REMOVED",
+      after.getLeft().getVersion() >= initialCrrVersion);
+  }
+
+  /**
+   * Each {@link PhoenixHAAdmin.LegacyCrrWriteMode}: error mapping (BadVersion 
+ NodeExists ->
+   * {@link StaleClusterRoleRecordVersionException}), unconditional 
FORCE_OVERWRITE, and
+   * CAS_WITH_VERSION rejecting negative versions. Sequential: ZK serializes 
versioned writes
+   * server-side, so the client retry path is identical to a real race.
+   */
+  @Test
+  public void testLegacyCrrCasErrorMappingAndModeDispatch() throws Exception {
+    String haGroupName = testName.getMethodName();
+
+    // Create.
+    ClusterRoleRecord initial = new ClusterRoleRecord(haGroupName, 
HighAvailabilityPolicy.FAILOVER,
+      ClusterRoleRecord.RegistryType.ZK, this.zkUrl, 
ClusterRoleRecord.ClusterRole.ACTIVE,
+      this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+    legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, initial,
+      PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0);
+
+    Pair<ClusterRoleRecord, Stat> existing =
+      legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+    assertNotNull(existing.getLeft());
+    int sharedVersion = existing.getRight().getVersion();
+
+    // CAS winner.
+    ClusterRoleRecord writerA = new ClusterRoleRecord(haGroupName, 
HighAvailabilityPolicy.FAILOVER,
+      ClusterRoleRecord.RegistryType.ZK, this.zkUrl,
+      ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, this.peerZKUrl,
+      ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+    legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, writerA,
+      PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, sharedVersion);
+
+    // CAS loser: same expected version -> BadVersion -> 
StaleClusterRoleRecordVersionException.
+    ClusterRoleRecord writerB = new ClusterRoleRecord(haGroupName, 
HighAvailabilityPolicy.FAILOVER,
+      ClusterRoleRecord.RegistryType.ZK, this.zkUrl, 
ClusterRoleRecord.ClusterRole.STANDBY,
+      this.peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, 2L);
+    assertThrows(StaleClusterRoleRecordVersionException.class,
+      () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, 
writerB,
+        PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, sharedVersion));
+
+    Pair<ClusterRoleRecord, Stat> winner =
+      legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+    assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+      winner.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+    assertTrue(winner.getRight().getVersion() > sharedVersion);
+
+    // CREATE_NEW on an existing znode -> NodeExists -> 
StaleClusterRoleRecordVersionException.
+    ClusterRoleRecord raceCreate =
+      new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER,
+        ClusterRoleRecord.RegistryType.ZK, this.zkUrl, 
ClusterRoleRecord.ClusterRole.STANDBY,
+        this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 3L);
+    assertThrows(StaleClusterRoleRecordVersionException.class,
+      () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, 
raceCreate,
+        PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0));
+
+    // FORCE_OVERWRITE bypasses CAS and bumps the stat version.
+    Stat statBeforeOverwrite =
+      
legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName).getRight();
+    ClusterRoleRecord overwrite =
+      new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER,
+        ClusterRoleRecord.RegistryType.ZK, this.zkUrl, 
ClusterRoleRecord.ClusterRole.STANDBY,
+        this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 4L);
+    legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, 
overwrite,
+      PhoenixHAAdmin.LegacyCrrWriteMode.FORCE_OVERWRITE, 0);
+    Pair<ClusterRoleRecord, Stat> afterOverwrite =
+      legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+    assertEquals(ClusterRoleRecord.ClusterRole.STANDBY,
+      afterOverwrite.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+    assertTrue(afterOverwrite.getRight().getVersion() > 
statBeforeOverwrite.getVersion());
+
+    // CAS_WITH_VERSION rejects negative expectedStatVersion before any ZK 
call.
+    ClusterRoleRecord illegalRecord =
+      new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER,
+        ClusterRoleRecord.RegistryType.ZK, this.zkUrl, 
ClusterRoleRecord.ClusterRole.OFFLINE,
+        this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L);
+    assertThrows(IllegalArgumentException.class,
+      () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, 
illegalRecord,
+        PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, -1));
+    assertThrows(IllegalArgumentException.class,
+      () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, 
illegalRecord,
+        PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, 
Integer.MIN_VALUE));
+  }
+
+  /** Peer-side role flip propagates to role2 in the local legacy CRR. */
+  @Test
+  public void testLegacyCrrSyncPeerRoleFlipUpdatesLegacyRole2() throws 
Exception {
+    String haGroupName = testName.getMethodName();
+    // Seed peer with STANDBY before client starts so the initial sync sees 
role2=STANDBY.
+    HAGroupStoreRecord peerStandby =
+      new HAGroupStoreRecord("v1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
+        HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, 
this.peerMasterUrl, this.masterUrl,
+        CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L);
+    createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, 
peerStandby);
+
+    Configuration conf = legacyCrrConf(true, 0); // periodic disabled to 
isolate event-driven path
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+    awaitLegacyCrrRole(haGroupName, ClusterType.PEER, 
ClusterRoleRecord.ClusterRole.STANDBY);
+
+    // Flip the peer record to a state whose cluster role is ACTIVE_TO_STANDBY.
+    HAGroupStoreRecord peerFlipped = new HAGroupStoreRecord("v1.0", 
haGroupName,
+      HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, 0L,
+      HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, 
this.peerMasterUrl, this.masterUrl,
+      CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L);
+    createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, 
peerFlipped);
+
+    Pair<ClusterRoleRecord, Stat> after = awaitLegacyCrrRole(haGroupName, 
ClusterType.PEER,
+      ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY);
+    assertEquals("Registry type must remain ZK after a peer-driven role flip",
+      ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType());
+  }
+
+  /** Absent peer record yields role2=UNKNOWN; converges when the peer record 
appears. */
+  @Test
+  public void testLegacyCrrSyncPeerAbsentYieldsUnknownAndConvergesOnRecovery() 
throws Exception {
+    String haGroupName = testName.getMethodName();
+    // No peer record seeded: peer cache is empty so 
getHAGroupStoreRecordFromPeer() returns null
+    // and role2 falls through to UNKNOWN.
+    Configuration conf = legacyCrrConf(true, 0);
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+    Pair<ClusterRoleRecord, Stat> initial =
+      awaitLegacyCrrRole(haGroupName, ClusterType.PEER, 
ClusterRoleRecord.ClusterRole.UNKNOWN);
+    long initialVersion = initial.getLeft().getVersion();
+
+    // Peer "recovers" by writing its consistentHA record. The PEER 
CHILD_ADDED event triggers
+    // the legacy sync to update role2.
+    HAGroupStoreRecord peerRecord =
+      new HAGroupStoreRecord("v1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
+        HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, 
this.peerMasterUrl, this.masterUrl,
+        CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L);
+    createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, 
peerRecord);
+
+    Pair<ClusterRoleRecord, Stat> recovered =
+      awaitLegacyCrrRole(haGroupName, ClusterType.PEER, 
ClusterRoleRecord.ClusterRole.STANDBY);
+    assertTrue("Version must bump when role2 transitions UNKNOWN -> STANDBY",
+      recovered.getLeft().getVersion() > initialVersion);
+  }
+
+  /** registryType stays ZK across multiple sync cycles (never reverts to 
RPC). */
+  @Test
+  public void testLegacyCrrSyncRegistryTypePreservedAcrossMultipleCycles() 
throws Exception {
+    String haGroupName = testName.getMethodName();
+    Configuration conf = legacyCrrConf(true, 0);
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+
+    Pair<ClusterRoleRecord, Stat> initial = awaitLegacyCrrPresent(haGroupName);
+    assertEquals(ClusterRoleRecord.RegistryType.ZK, 
initial.getLeft().getRegistryType());
+    assertEquals("Initial local role should be ACTIVE per @Before seed",
+      ClusterRoleRecord.ClusterRole.ACTIVE,
+      initial.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+    long lastVersion = initial.getLeft().getVersion();
+
+    // Drive a sequence of distinct peer states; each event drives a sync that 
rewrites the
+    // legacy znode (or short-circuits if logically equal). Direct ZK writes 
intentionally
+    // bypass setHAGroupStatusIfNeeded's transition guard.
+    HAGroupStoreRecord.HAGroupState[] cycle =
+      new HAGroupStoreRecord.HAGroupState[] { 
HAGroupStoreRecord.HAGroupState.STANDBY,
+        HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
+        HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE, 
HAGroupStoreRecord.HAGroupState.OFFLINE,
+        HAGroupStoreRecord.HAGroupState.STANDBY };
+    for (HAGroupStoreRecord.HAGroupState state : cycle) {
+      HAGroupStoreRecord peer = new HAGroupStoreRecord("v1.0", haGroupName, 
state, 0L,
+        HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, 
this.peerMasterUrl, this.masterUrl,
+        CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L);
+      createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, 
peer);
+      Pair<ClusterRoleRecord, Stat> after =
+        awaitLegacyCrrRole(haGroupName, ClusterType.PEER, 
state.getClusterRole());
+      assertEquals(
+        "Local role must remain ACTIVE across peer-driven cycles (peer state=" 
+ state + ")",
+        ClusterRoleRecord.ClusterRole.ACTIVE,
+        after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+      assertEquals("Registry type must remain ZK after a sync cycle (peer 
state=" + state + ")",
+        ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType());
+      assertTrue(
+        "Logical version must monotonically increase across distinct sync 
cycles (peer state="
+          + state + ")",
+        after.getLeft().getVersion() > lastVersion);
+      lastVersion = after.getLeft().getVersion();
+    }
+  }
+
+  /** Periodic loop repairs an external divergence with no consistentHA event. 
*/
+  @Test
+  public void testLegacyCrrSyncPeriodicReconciliationRecoversAfterDivergence() 
throws Exception {
+    String haGroupName = testName.getMethodName();
+    Configuration conf = legacyCrrConf(true, 2); // 2s interval; jitter is 
0-30s on first run
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+    Pair<ClusterRoleRecord, Stat> initial = awaitLegacyCrrPresent(haGroupName);
+    assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE,
+      initial.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+
+    // Externally corrupt the legacy znode; no consistentHA event fires, so 
only the periodic
+    // reconciler can recover.
+    ClusterRoleRecord corrupt = new ClusterRoleRecord(haGroupName, 
HighAvailabilityPolicy.FAILOVER,
+      ClusterRoleRecord.RegistryType.ZK, this.zkUrl, 
ClusterRoleRecord.ClusterRole.STANDBY,
+      this.peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, 
initial.getLeft().getVersion() + 10);
+    legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, corrupt,
+      PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, 
initial.getRight().getVersion());
+    Pair<ClusterRoleRecord, Stat> corrupted = readLegacyCrr(haGroupName);
+    assertEquals(ClusterRoleRecord.ClusterRole.STANDBY,
+      corrupted.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+
+    // Worst-case wait: jitter up to 30s + 2s interval; allow 40s.
+    long deadline = System.currentTimeMillis() + 40_000L;
+    Pair<ClusterRoleRecord, Stat> after = readLegacyCrr(haGroupName);
+    while (
+      (after.getLeft() == null || 
after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))
+          != ClusterRoleRecord.ClusterRole.ACTIVE)
+        && System.currentTimeMillis() < deadline
+    ) {
+      Thread.sleep(500);
+      after = readLegacyCrr(haGroupName);
+    }
+    assertNotNull(after.getLeft());
+    assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE,
+      after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+    assertTrue(after.getLeft().getVersion() > 
corrupted.getLeft().getVersion());
+    assertEquals(ClusterRoleRecord.RegistryType.ZK, 
after.getLeft().getRegistryType());
+  }
+
+  /**
+   * Peer view absent: client preserves the pre-seeded {@code role2} rather 
than downgrading it to
+   * UNKNOWN. Information from a prior write is more authoritative than a 
transient gap in the local
+   * peer cache; another RS with peer visibility (or this client once peer 
recovers) will overwrite
+   * when there is real news.
+   */
+  @Test
+  public void testLegacyCrrSyncPreservesPreSeededRole2WhenPeerMissing() throws 
Exception {
+    String haGroupName = testName.getMethodName();
+    // Pre-seed role2=OFFLINE; do NOT create a peer consistentHA record.
+    ClusterRoleRecord preSeed = new ClusterRoleRecord(haGroupName, 
HighAvailabilityPolicy.FAILOVER,
+      ClusterRoleRecord.RegistryType.ZK, this.zkUrl, 
ClusterRoleRecord.ClusterRole.ACTIVE,
+      this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L);
+    legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, preSeed,
+      PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0);
+    Pair<ClusterRoleRecord, Stat> seeded = readLegacyCrr(haGroupName);
+    assertNotNull(seeded.getLeft());
+    int seededStatVersion = seeded.getRight().getVersion();
+
+    Configuration conf = legacyCrrConf(true, 0); // periodic disabled; initial 
sync only
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+
+    // Allow the initial async sync ample time to run. With peer absent and 
role2=OFFLINE
+    // already in the znode, the equality check must short-circuit and the 
znode must remain
+    // byte-identical.
+    Thread.sleep(3_000);
+
+    Pair<ClusterRoleRecord, Stat> after = readLegacyCrr(haGroupName);
+    assertNotNull(after.getLeft());
+    assertEquals("Pre-seeded role2 must be preserved when peer view is absent",
+      ClusterRoleRecord.ClusterRole.OFFLINE,
+      after.getLeft().getRole(formattedZkUrlFor(ClusterType.PEER)));
+    assertEquals("Znode must not be rewritten when desired record is logically 
equal",
+      seededStatVersion, after.getRight().getVersion());
+    assertEquals(seeded.getLeft().getVersion(), after.getLeft().getVersion());
+  }
+
+  /**
+   * Peer view present: client overwrites a pre-seeded stale {@code role2} 
with the live peer state
+   * on the initial sync, bumping the version.
+   */
+  @Test
+  public void testLegacyCrrSyncOverwritesPreSeededRole2WhenPeerPresent() 
throws Exception {
+    String haGroupName = testName.getMethodName();
+    // Pre-seed role2=OFFLINE.
+    ClusterRoleRecord preSeed = new ClusterRoleRecord(haGroupName, 
HighAvailabilityPolicy.FAILOVER,
+      ClusterRoleRecord.RegistryType.ZK, this.zkUrl, 
ClusterRoleRecord.ClusterRole.ACTIVE,
+      this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L);
+    legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, preSeed,
+      PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0);
+    Pair<ClusterRoleRecord, Stat> seeded = readLegacyCrr(haGroupName);
+    assertNotNull(seeded.getLeft());
+
+    // Create a peer consistentHA record so the sync can see real peer state.
+    HAGroupStoreRecord peerRecord =
+      new HAGroupStoreRecord("v1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
+        HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, 
this.peerMasterUrl, this.masterUrl,
+        CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L);
+    createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, 
peerRecord);
+
+    Configuration conf = legacyCrrConf(true, 0); // periodic disabled; initial 
sync only
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+
+    Pair<ClusterRoleRecord, Stat> after =
+      awaitLegacyCrrRole(haGroupName, ClusterType.PEER, 
ClusterRoleRecord.ClusterRole.STANDBY);
+    assertEquals(ClusterRoleRecord.RegistryType.ZK, 
after.getLeft().getRegistryType());
+    assertTrue("Version must bump when peer state replaces stale role2",
+      after.getLeft().getVersion() > seeded.getLeft().getVersion());
+  }
+
+  /**
+   * Regression for the apurtell PR-2479 lockout concern. Seeds the legacy 
znode with the
+   * pre-PHOENIX-7495 JSON shape ({@code zk1}/{@code zk2} keys, no {@code 
registryType}). Strict
+   * Jackson on this build rejects the unknown {@code zk1}/{@code zk2} fields, 
so {@code fromJson}
+   * returns empty and the sync path takes the {@code existing=null} + {@code 
stat!=null} branch,
+   * CAS-overwriting the bytes with a fresh ZK-registry record.
+   */
+  @Test
+  public void testLegacyCrrSyncMigratesOlderZk1Zk2Record() throws Exception {
+    String haGroupName = testName.getMethodName();
+
+    // \\\\ in the Java literal -> \\ in JSON bytes -> \ in the parsed url 
string.
+    String legacyJson = String.format("{\"haGroupName\":\"%s\"," + 
"\"policy\":\"FAILOVER\","
+      + "\"zk1\":\"legacy-host-1\\\\:2181::/hbase\"," + "\"role1\":\"ACTIVE\","
+      + "\"zk2\":\"legacy-host-2\\\\:2181::/hbase\"," + 
"\"role2\":\"STANDBY\"," + "\"version\":7}",
+      haGroupName);
+
+    // Sanity: this build's strict Jackson cannot decode the pre-PHOENIX-7495 
shape.
+    assertFalse("Pre-PHOENIX-7495 JSON must fail to parse on this build",
+      
ClusterRoleRecord.fromJson(legacyJson.getBytes(StandardCharsets.UTF_8)).isPresent());
+
+    // Seed the legacy znode with the older bytes, then start the client with 
sync on.
+    
legacyHaAdmin.getCurator().create().creatingParentsIfNeeded().forPath(toPath(haGroupName),
+      legacyJson.getBytes(StandardCharsets.UTF_8));
+
+    Configuration conf = legacyCrrConf(/* legacyEnabled */ true, /* 
periodicSec */ 60);
+    HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, 
haGroupName, zkUrl);
+    assertNotNull(client);
+
+    // Migration: existing=null (unparseable) but stat populated -> CAS 
overwrite path.
+    Pair<ClusterRoleRecord, Stat> migrated = 
awaitLegacyCrrPresent(haGroupName);
+    ClusterRoleRecord crr = migrated.getLeft();
+
+    assertEquals("Migrated record must carry registryType=ZK", 
ClusterRoleRecord.RegistryType.ZK,
+      crr.getRegistryType());
+    assertEquals("Local role must reflect live consistentHA state (seeded 
ACTIVE)",
+      ClusterRoleRecord.ClusterRole.ACTIVE, 
crr.getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+    // The seeded record's version=7 is lost because the bytes don't parse; 
buildDesiredLegacyCrr
+    // sees existing=null and bases the new version on 
local.getAdminCRRVersion(). Just assert
+    // monotonic-from-zero.
+    assertTrue("Migrated record must have positive version", crr.getVersion() 
>= 1L);
+    // ZK stat reflects an overwrite (CAS at stat.version=0 produced 
stat.version=1).
+    assertEquals("ZK stat version must reflect overwrite", 1, 
migrated.getRight().getVersion());
+  }
+
+  // ---------- Legacy CRR sync test helpers ----------
+
+  /** Configuration clone with the legacy CRR flag and reconciliation interval 
set. */
+  private Configuration legacyCrrConf(boolean legacyEnabled, long periodicSec) 
{
+    Configuration src = CLUSTERS.getHBaseCluster1().getConfiguration();
+    Configuration cloned = new Configuration(src);
+    cloned.setBoolean(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, legacyEnabled);
+    cloned.setLong(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS, 
periodicSec);
+    return cloned;
+  }
+
+  private Pair<ClusterRoleRecord, Stat> readLegacyCrr(String haGroupName) 
throws IOException {
+    return legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+  }
+
+  /** Polls the legacy CRR until {@code condition} matches or the propagation 
deadline elapses. */
+  private Pair<ClusterRoleRecord, Stat> awaitLegacyCrr(String haGroupName,
+    Predicate<ClusterRoleRecord> condition, String description) throws 
Exception {
+    long deadline = System.currentTimeMillis() + 
ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS;
+    Pair<ClusterRoleRecord, Stat> legacy = readLegacyCrr(haGroupName);
+    while (
+      (legacy.getLeft() == null || !condition.test(legacy.getLeft()))
+        && System.currentTimeMillis() < deadline
+    ) {
+      Thread.sleep(100);
+      legacy = readLegacyCrr(haGroupName);
+    }
+    assertNotNull("Legacy znode missing while awaiting: " + description, 
legacy.getLeft());
+    assertTrue("Legacy CRR condition not met within timeout: " + description,
+      condition.test(legacy.getLeft()));
+    return legacy;
+  }
+
+  private Pair<ClusterRoleRecord, Stat> awaitLegacyCrrPresent(String 
haGroupName) throws Exception {
+    return awaitLegacyCrr(haGroupName, crr -> true, "znode present");
+  }
+
+  /** Polls until the LOCAL or PEER role in the legacy CRR matches {@code 
expectedRole}. */
+  private Pair<ClusterRoleRecord, Stat> awaitLegacyCrrRole(String haGroupName,
+    ClusterType clusterType, ClusterRoleRecord.ClusterRole expectedRole) 
throws Exception {
+    String url = formattedZkUrlFor(clusterType);
+    return awaitLegacyCrr(haGroupName, crr -> crr.getRole(url) == expectedRole,
+      clusterType + " role == " + expectedRole);
+  }
+
+  /** LOCAL or PEER ZK URL in the canonical ZK-registry form used by the 
legacy sync. */
+  private String formattedZkUrlFor(ClusterType clusterType) {
+    String raw = (clusterType == ClusterType.LOCAL) ? zkUrl : peerZKUrl;
+    return JDBCUtil.formatUrl(raw, ClusterRoleRecord.RegistryType.ZK);
+  }
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
index e6e71d86a3..4a2e43a3b8 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
@@ -229,6 +229,110 @@ public class ClusterRoleRecordTest {
     assertEquals(ClusterRole.UNKNOWN, role);
   }
 
+  /** JSON without a {@code registryType} field defaults to RPC on 
deserialization. */
+  @Test
+  public void testFromJsonWithoutRegistryTypeDefaultsToRpc() throws 
IOException {
+    byte[] data = readFile("json/test_role_record_no_registry_type.json");
+    Optional<ClusterRoleRecord> opt = ClusterRoleRecord.fromJson(data);
+    assertTrue(opt.isPresent());
+    ClusterRoleRecord record = opt.get();
+    assertEquals(ClusterRoleRecord.RegistryType.RPC, record.getRegistryType());
+    assertEquals(HighAvailabilityPolicy.FAILOVER, record.getPolicy());
+    assertEquals(7L, record.getVersion());
+    assertEquals(ClusterRole.ACTIVE, record.getRole1());
+    assertEquals(ClusterRole.STANDBY, record.getRole2());
+  }
+
+  /** Explicit {@code registryType=RPC} must round-trip as RPC. */
+  @Test
+  public void testFromJsonExplicitRpcRegistryTypeRoundTrips() throws 
IOException {
+    Optional<ClusterRoleRecord> opt =
+      
ClusterRoleRecord.fromJson(readFile("json/test_role_record_explicit_rpc.json"));
+    assertTrue(opt.isPresent());
+    assertEquals(ClusterRoleRecord.RegistryType.RPC, 
opt.get().getRegistryType());
+    assertEquals(11L, opt.get().getVersion());
+  }
+
+  /** Explicit {@code registryType=ZK} round-trips as ZK. */
+  @Test
+  public void testFromJsonExplicitZkRegistryTypeRoundTrips() throws 
IOException {
+    Optional<ClusterRoleRecord> opt =
+      
ClusterRoleRecord.fromJson(readFile("json/test_role_record_explicit_zk.json"));
+    assertTrue(opt.isPresent());
+    assertEquals(ClusterRoleRecord.RegistryType.ZK, 
opt.get().getRegistryType());
+    assertEquals(13L, opt.get().getVersion());
+  }
+
+  /** Round-trip: ZK-registry CRR -> JSON -> CRR preserves {@code 
registryType}. */
+  @Test
+  public void testToFromJsonPreservesZkRegistryTypeAcrossRoundTrip() throws 
IOException {
+    ClusterRoleRecord written = new ClusterRoleRecord(testName.getMethodName(),
+      HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, 
"zk1\\:2181::/hbase",
+      ClusterRole.ACTIVE, "zk2\\:2181::/hbase", ClusterRole.STANDBY, 42L);
+    Optional<ClusterRoleRecord> read =
+      ClusterRoleRecord.fromJson(ClusterRoleRecord.toJson(written));
+    assertTrue(read.isPresent());
+    assertEquals(ClusterRoleRecord.RegistryType.ZK, 
read.get().getRegistryType());
+    assertEquals(written, read.get());
+  }
+
+  // Tests for isLogicallyEqualIgnoringVersionAndRegistry
+
+  @Test
+  public void testLogicalEquality_nullOther() {
+    ClusterRoleRecord r =
+      new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, 
ClusterRoleRecord.RegistryType.ZK,
+        "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase", 
ClusterRole.STANDBY, 1L);
+    assertFalse(r.isLogicallyEqualIgnoringVersionAndRegistry(null));
+  }
+
+  @Test
+  public void testLogicalEquality_sameFieldsDifferentVersion() {
+    ClusterRoleRecord a =
+      new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, 
ClusterRoleRecord.RegistryType.ZK,
+        "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase", 
ClusterRole.STANDBY, 1L);
+    ClusterRoleRecord b =
+      new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, 
ClusterRoleRecord.RegistryType.ZK,
+        "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase", 
ClusterRole.STANDBY, 42L);
+    assertTrue(a.isLogicallyEqualIgnoringVersionAndRegistry(b));
+    assertTrue(b.isLogicallyEqualIgnoringVersionAndRegistry(a));
+  }
+
+  @Test
+  public void testLogicalEquality_zkVsRpcWithDifferentUrlForms() {
+    // Same logical roles but different URL forms (different registryType 
normalization)
+    // are NOT equal because the normalized url1/url2 differ.
+    ClusterRoleRecord zkForm =
+      new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, 
ClusterRoleRecord.RegistryType.ZK,
+        "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase", 
ClusterRole.STANDBY, 1L);
+    ClusterRoleRecord rpcForm = new ClusterRoleRecord("g", 
HighAvailabilityPolicy.FAILOVER,
+      ClusterRoleRecord.RegistryType.RPC, "master1\\:16000", 
ClusterRole.ACTIVE, "master2\\:16000",
+      ClusterRole.STANDBY, 1L);
+    assertFalse(zkForm.isLogicallyEqualIgnoringVersionAndRegistry(rpcForm));
+  }
+
+  @Test
+  public void testLogicalEquality_differentRole() {
+    ClusterRoleRecord a =
+      new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, 
ClusterRoleRecord.RegistryType.ZK,
+        "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase", 
ClusterRole.STANDBY, 1L);
+    ClusterRoleRecord b =
+      new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER, 
ClusterRoleRecord.RegistryType.ZK,
+        "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase", 
ClusterRole.OFFLINE, 1L);
+    assertFalse(a.isLogicallyEqualIgnoringVersionAndRegistry(b));
+  }
+
+  @Test
+  public void testLogicalEquality_differentHaGroupName() {
+    ClusterRoleRecord a = new ClusterRoleRecord("g1", 
HighAvailabilityPolicy.FAILOVER,
+      ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", 
ClusterRole.ACTIVE,
+      "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L);
+    ClusterRoleRecord b = new ClusterRoleRecord("g2", 
HighAvailabilityPolicy.FAILOVER,
+      ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", 
ClusterRole.ACTIVE,
+      "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L);
+    assertFalse(a.isLogicallyEqualIgnoringVersionAndRegistry(b));
+  }
+
   // Private Helper Methods
 
   private ClusterRoleRecord getClusterRoleRecord(String name, 
HighAvailabilityPolicy policy,
diff --git 
a/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json 
b/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json
new file mode 100644
index 0000000000..e2b7269b48
--- /dev/null
+++ b/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json
@@ -0,0 +1,10 @@
+{
+  "haGroupName" : "testFromJsonExplicitRpcRoundTrips",
+  "policy" : "FAILOVER",
+  "registryType" : "RPC",
+  "url1" : "url1\\:2181",
+  "role1" : "ACTIVE",
+  "url2" : "url2\\:2181",
+  "role2" : "STANDBY",
+  "version" : 11
+}
diff --git 
a/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json 
b/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json
new file mode 100644
index 0000000000..1e9ce174ef
--- /dev/null
+++ b/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json
@@ -0,0 +1,10 @@
+{
+  "haGroupName" : "testFromJsonExplicitZkRoundTrips",
+  "policy" : "FAILOVER",
+  "registryType" : "ZK",
+  "url1" : "zk1\\:2181::/hbase",
+  "role1" : "ACTIVE",
+  "url2" : "zk2\\:2181::/hbase",
+  "role2" : "STANDBY",
+  "version" : 13
+}
diff --git 
a/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json 
b/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json
new file mode 100644
index 0000000000..517e061b65
--- /dev/null
+++ 
b/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json
@@ -0,0 +1,9 @@
+{
+  "haGroupName" : "testFromJsonWithoutRegistryTypeDefaultsToRpc",
+  "policy" : "FAILOVER",
+  "url1" : "url1\\:2181",
+  "role1" : "ACTIVE",
+  "url2" : "url2\\:2181",
+  "role2" : "STANDBY",
+  "version" : 7
+}

Reply via email to