This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 941a31029c PHOENIX-7787 Make CCF HAGroupStore ZK Updates backward
compatible with existing ZK based client (#2479)
941a31029c is described below
commit 941a31029c9e695e10fb7433d8ecebd5cda254d3
Author: ritegarg <[email protected]>
AuthorDate: Fri May 29 13:54:52 2026 -0700
PHOENIX-7787 Make CCF HAGroupStore ZK Updates backward compatible with
existing ZK based client (#2479)
* PHOENIX-7787 Make CCF HAGroupStore ZK Updates backward compatible with
existing ZK based client
* PHOENIX-7787 Look up preserved peer role by URL in buildDesiredLegacyCrr
ClusterRoleRecord's ctor canonicalizes url1/url2 by lexical order, so
existing.getRole2() returns the role for whichever URL sorts larger -
not necessarily the peer URL. When the local peer cache is empty and we
were trying to preserve existing peer role, we'd half the time inherit
the local role instead, then the equality check would see a "logical
change" and trigger a redundant CAS write.
Manifests as flakes (deployment-dependent on ZK URL alphabetical order)
in
HAGroupStoreClientIT.testLegacyCrrSyncPreservesPreSeededRole2WhenPeerMissing
and testLegacyCrrSyncStateOnlyChangeDoesNotRewriteLegacy under the full
IT suite; clean 40/40 across two consecutive full-verify runs with the
fix.
Co-authored-by: Cursor <[email protected]>
* PHOENIX-7787 Apply spotless formatting
Pure formatting changes from `mvn spotless:apply` across the touched files;
no behavior change.
Generated-by: Cursor (Claude).
Co-authored-by: Cursor <[email protected]>
* PHOENIX-7787 Address PR review comments
Co-authored-by: Cursor <[email protected]>
---------
Co-authored-by: Ritesh Garg
<[email protected]>
Co-authored-by: Cursor <[email protected]>
---
.../StaleClusterRoleRecordVersionException.java | 35 ++
.../org/apache/phoenix/jdbc/ClusterRoleRecord.java | 70 ++-
.../apache/phoenix/jdbc/HAGroupStoreClient.java | 288 ++++++++++-
.../org/apache/phoenix/jdbc/PhoenixHAAdmin.java | 94 +++-
.../org/apache/phoenix/query/QueryServices.java | 10 +
.../apache/phoenix/query/QueryServicesOptions.java | 14 +-
.../org/apache/phoenix/schema/MetaDataClient.java | 6 +-
.../apache/phoenix/jdbc/HAGroupStoreClientIT.java | 558 ++++++++++++++++++++-
.../apache/phoenix/jdbc/ClusterRoleRecordTest.java | 104 ++++
.../json/test_role_record_explicit_rpc.json | 10 +
.../json/test_role_record_explicit_zk.json | 10 +
.../json/test_role_record_no_registry_type.json | 9 +
12 files changed, 1169 insertions(+), 39 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java
new file mode 100644
index 0000000000..e15a716fa6
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+/**
+ * CAS write to the legacy {@code /phoenix/ha} CRR znode failed (BadVersion or
NodeExists); the
+ * caller can re-read and retry if needed. Analog of
+ * {@link StaleHAGroupStoreRecordVersionException}.
+ */
+public class StaleClusterRoleRecordVersionException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public StaleClusterRoleRecordVersionException(String msg) {
+ super(msg);
+ }
+
+ public StaleClusterRoleRecordVersionException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
index 0ba6b312d7..7e5b597fb9 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Objects;
import java.util.Optional;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -126,31 +127,50 @@ public class ClusterRoleRecord {
private final long version;
/**
- * To handle backward compatibility with old ClusterRoleRecords which had
zk1 and zk2 as keys for
- * zk urls, This constructor is only being used {@link
ClusterRoleRecord#fromJson} when we
- * deserialize Cluster Role Record read from ZooKeeper ZNode. If CRR is in
old format we will read
- * zk1 and zk2 and url1 and url2 will be null and if it is in new format zk1
and zk2 will be null
- * in both cases final url is being stored in url1 and url2 url will be
stored in normalized forms
- * which looks like zk1\\:port1,zk2\\:port2,zk3\\:port3,
zk4\\:port4,zk5\\:port5::znode or
- * master1\\:port1,master2\\:port2,master3\\:port3,
master4\\:port4,master5\\:port5
+ * Convenience constructor: defaults {@code registryType} to
+ * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). Use the explicit
overload below to write
+ * {@link RegistryType#ZK} records for the legacy {@code /phoenix/ha} znode.
* @param haGroupName HighAvailability Group name / CRR name
- * @param policy Policy used by give CRR
- * @param url1 ZK/HMaster url based on registry type for first cluster
- * @param role1 {@link ClusterRole} which describes the current state
of first cluster
- * @param url2 ZK/HMaster url based on registry type for second
cluster
- * @param role2 {@link ClusterRole} which describes the current state
of second cluster
- * @param version version of a given CRR
+ * @param policy Policy used by the given CRR
+ * @param url1 ZK/HMaster url for the first cluster
+ * @param role1 {@link ClusterRole} describing the current state of
the first cluster
+ * @param url2 ZK/HMaster url for the second cluster
+ * @param role2 {@link ClusterRole} describing the current state of
the second cluster
+ * @param version monotonic version of this CRR
+ */
+ public ClusterRoleRecord(String haGroupName, HighAvailabilityPolicy policy,
String url1,
+ ClusterRole role1, String url2, ClusterRole role2, long version) {
+ this(haGroupName, policy, DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE, url1,
role1, url2, role2,
+ version);
+ }
+
+ /**
+ * Canonical constructor; also the {@code @JsonCreator} entry point so the
persisted
+ * {@code registryType} round-trips correctly. Records persisted before
{@code registryType} was
+ * added as a JSON field pass {@code null} here and default to
+ * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). URLs are normalized
to the
+ * {@code registryType}-specific canonical form for accurate comparisons;
the resulting
+ * {@code url1}/{@code url2} are stored as {@code
zk1\:port1,zk2\:port2,...::znode} for ZK or
+ * {@code master1\:port1,master2\:port2,...} for RPC/MASTER.
+ * @param haGroupName HighAvailability Group name / CRR name
+ * @param policy Policy used by the given CRR
+ * @param registryType {@link RegistryType} for URL normalization; {@code
null} defaults to
+ * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC)
+ * @param url1 ZK/HMaster url for the first cluster
+ * @param role1 {@link ClusterRole} describing the current state of
the first cluster
+ * @param url2 ZK/HMaster url for the second cluster
+ * @param role2 {@link ClusterRole} describing the current state of
the second cluster
+ * @param version monotonic version of this CRR
*/
@JsonCreator
public ClusterRoleRecord(@JsonProperty("haGroupName") String haGroupName,
- @JsonProperty("policy") HighAvailabilityPolicy policy,
@JsonProperty("url1") String url1,
+ @JsonProperty("policy") HighAvailabilityPolicy policy,
+ @JsonProperty("registryType") RegistryType registryType,
@JsonProperty("url1") String url1,
@JsonProperty("role1") ClusterRole role1, @JsonProperty("url2") String
url2,
@JsonProperty("role2") ClusterRole role2, @JsonProperty("version") long
version) {
this.haGroupName = haGroupName;
this.policy = policy;
-
- // Default registry type is RPC from Consistent Cluster Failover onwards
- this.registryType = DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE;
+ this.registryType = registryType != null ? registryType :
DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE;
// Do we really need to normalize here ?
// We are normalizing to have urls in specific formats for each
registryType for getting
@@ -221,6 +241,22 @@ public class ClusterRoleRecord {
return haGroupName.equals(other.haGroupName) &&
policy.equals(other.policy);
}
+ /**
+ * Equality on the six identity/role fields ({@code haGroupName, policy,
url1, url2, role1,
+ * role2}); ignores {@code version} (always bumps) and the {@code
registryType} field itself.
+ * Same-registry callers only: {@code url1}/{@code url2} are normalized at
construction per
+ * {@code registryType}, so cross-registry records will not compare equal
even for the same
+ * underlying host:port. Returns {@code false} if {@code other} is {@code
null}.
+ */
+ public boolean isLogicallyEqualIgnoringVersionAndRegistry(ClusterRoleRecord
other) {
+ if (other == null) {
+ return false;
+ }
+ return Objects.equals(haGroupName, other.haGroupName) &&
Objects.equals(policy, other.policy)
+ && Objects.equals(url1, other.url1) && Objects.equals(url2, other.url2)
+ && role1 == other.role1 && role2 == other.role2;
+ }
+
/** Returns true if CRR has any url in UNKNOWN role/state. */
public boolean hasUnknownRole() {
return role1 == ClusterRole.UNKNOWN || role2 == ClusterRole.UNKNOWN;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index fe299d14fb..9d2c268eae 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.jdbc;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
+import static
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_1;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_2;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_1;
@@ -34,7 +35,11 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ZK_URL_2;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
import static
org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_SYNC_INTERVAL_SECONDS;
+import static
org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS;
+import static
org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
@@ -56,17 +61,20 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
+import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException;
import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException;
import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
import org.apache.phoenix.jdbc.ClusterRoleRecord.RegistryType;
@@ -80,6 +88,7 @@ import org.slf4j.LoggerFactory;
import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.MoreExecutors;
/**
* Main implementation of HAGroupStoreClient with peer support. Write-through
cache for HAGroupStore
@@ -98,8 +107,13 @@ public class HAGroupStoreClient implements Closeable {
public static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1;
// Maximum jitter in seconds for sync job start time (10 seconds)
private static final long SYNC_JOB_MAX_JITTER_SECONDS = 10;
+ // Exclusive upper bound for initial-delay jitter on the periodic reconciler
(0..30s).
+ private static final long LEGACY_CRR_SYNC_JOB_MAX_JITTER_SECONDS = 31;
private PhoenixHAAdmin phoenixHaAdmin;
private PhoenixHAAdmin peerPhoenixHaAdmin;
+ // Admin + NodeCache on /phoenix/ha; null when feature disabled.
+ private volatile PhoenixHAAdmin legacyHaAdmin;
+ private volatile NodeCache legacyCrrNodeCache;
private static final Logger LOGGER =
LoggerFactory.getLogger(HAGroupStoreClient.class);
// Map of <ZKUrl, <HAGroupName, HAGroupStoreClientInstance>>
private static final Map<String, ConcurrentHashMap<String,
HAGroupStoreClient>> instances =
@@ -132,6 +146,12 @@ public class HAGroupStoreClient implements Closeable {
CopyOnWriteArraySet<HAGroupStateListener>> targetStateSubscribers = new
ConcurrentHashMap<>();
// Scheduled executor for periodic sync job
private ScheduledExecutorService syncExecutor;
+ // Legacy CRR sync state. All invocations of syncLegacyCRRIfRoleChanged go
through the
+ // single-threaded legacyCrrSyncExecutor (initial sync, periodic, and
event-driven), so
+ // calls are naturally serialized; close races are handled via local
snapshots of mutable
+ // fields inside the method. No additional lock is needed.
+ private final boolean legacyCrrSyncEnabled;
+ private volatile ScheduledExecutorService legacyCrrSyncExecutor;
public static HAGroupStoreClient getInstance(Configuration conf, String
haGroupName)
throws SQLException {
@@ -164,8 +184,15 @@ public class HAGroupStoreClient implements Closeable {
result.close();
result = null;
} else {
- instances.putIfAbsent(localZkUrl, new ConcurrentHashMap<>());
- instances.get(localZkUrl).put(haGroupName, result);
+ // Atomic register; pairs with deregisterFromInstances()'s
computeIfPresent.
+ final HAGroupStoreClient created = result;
+ instances.compute(localZkUrl, (k, v) -> {
+ if (v == null) {
+ v = new ConcurrentHashMap<>();
+ }
+ v.put(haGroupName, created);
+ return v;
+ });
}
}
}
@@ -224,6 +251,8 @@ public class HAGroupStoreClient implements Closeable {
conf.getLong(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT) *
ZK_SESSION_TIMEOUT_MULTIPLIER);
this.rotationTimeMs =
conf.getLong(QueryServices.REPLICATION_LOG_ROTATION_TIME_MS_KEY,
QueryServicesOptions.DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
+ this.legacyCrrSyncEnabled =
conf.getBoolean(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED,
+ DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED);
// Custom Event Listener
this.peerCustomPathChildrenCacheListener = peerPathChildrenCacheListener;
try {
@@ -246,6 +275,12 @@ public class HAGroupStoreClient implements Closeable {
// Start periodic sync job
startPeriodicSyncJob();
+ // Opt-in legacy /phoenix/ha sync. Setup failures propagate so the
client is marked
+ // unhealthy rather than silently dropping the legacy znode out of sync.
+ if (legacyCrrSyncEnabled && this.isHealthy) {
+ setupLegacyCrrSync();
+ }
+
} catch (Exception e) {
this.isHealthy = false;
close();
@@ -886,9 +921,22 @@ public class HAGroupStoreClient implements Closeable {
if (cacheType == ClusterType.LOCAL) {
maybeInitializePeerPathChildrenCache();
}
+ // Offload the legacy CRR sync (it does ZK + JDBC I/O) so we don't
block
+ // Curator's per-namespace event dispatcher.
+ ScheduledExecutorService syncExec = legacyCrrSyncExecutor;
+ if (syncExec != null) {
+ try {
+ syncExec.execute(this::syncLegacyCRRIfRoleChanged);
+ } catch (RejectedExecutionException ree) {
+ // Executor already shutting down (close() race); drop
silently.
+ LOGGER.debug("Legacy CRR sync skipped for HA group {}:
executor shut down",
+ haGroupName);
+ }
+ }
}
break;
case CHILD_REMOVED:
+ // No-op: the legacy /phoenix/ha znode is never deleted by this
client.
break;
case INITIALIZED:
latch.countDown();
@@ -981,34 +1029,66 @@ public class HAGroupStoreClient implements Closeable {
}
/**
- * Shuts down the periodic sync executor gracefully.
+ * Remove this instance from the static {@link #instances} map. Idempotent.
Uses value-based
+ * remove so that, if a concurrent {@link #getInstanceForZkUrl} has already
swapped in a fresh
+ * replacement, the replacement is preserved. The outer-CHM {@code
computeIfPresent} below pairs
+ * with the {@code compute} in {@link #getInstanceForZkUrl} to serialize
bucket creation/removal
+ * on the same key.
*/
+ private void deregisterFromInstances() {
+ final String key = (this.zkUrl != null) ? this.zkUrl :
getLocalZkUrl(this.conf);
+ if (key == null) {
+ return;
+ }
+ final ConcurrentHashMap<String, HAGroupStoreClient> bucket =
instances.get(key);
+ if (bucket == null) {
+ return;
+ }
+ bucket.remove(this.haGroupName, this);
+ instances.computeIfPresent(key, (k, v) -> v.isEmpty() ? null : v);
+ }
+
private void shutdownSyncExecutor() {
if (syncExecutor != null) {
- syncExecutor.shutdown();
- try {
- if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
- syncExecutor.shutdownNow();
- }
- } catch (InterruptedException e) {
- syncExecutor.shutdownNow();
- Thread.currentThread().interrupt();
- }
+ MoreExecutors.shutdownAndAwaitTermination(syncExecutor, 5,
TimeUnit.SECONDS);
syncExecutor = null;
}
}
+ private void shutdownLegacyCrrSyncExecutor() {
+ if (legacyCrrSyncExecutor != null) {
+ MoreExecutors.shutdownAndAwaitTermination(legacyCrrSyncExecutor, 5,
TimeUnit.SECONDS);
+ legacyCrrSyncExecutor = null;
+ }
+ }
+
@Override
public void close() {
try {
LOGGER.info("Closing HAGroupStoreClient");
- // Shutdown sync executor
+ // Mark unhealthy and deregister from the static cache first so any
concurrent
+ // getInstanceForZkUrl() does not hand out a half-closed instance.
+ isHealthy = false;
+ deregisterFromInstances();
+ // Executors -> caches -> admins. Null-before-close on legacy resources
so a racing
+ // listener sees either a live or null reference, never half-closed.
shutdownSyncExecutor();
+ shutdownLegacyCrrSyncExecutor();
if (pathChildrenCache != null) {
pathChildrenCache.close();
pathChildrenCache = null;
}
closePeerConnection();
+ NodeCache nodeCache = this.legacyCrrNodeCache;
+ this.legacyCrrNodeCache = null;
+ if (nodeCache != null) {
+ nodeCache.close();
+ }
+ PhoenixHAAdmin admin = this.legacyHaAdmin;
+ this.legacyHaAdmin = null;
+ if (admin != null) {
+ admin.close();
+ }
LOGGER.info("Closed HAGroupStoreClient");
} catch (IOException e) {
LOGGER.error("Exception closing HAGroupStoreClient", e);
@@ -1047,6 +1127,188 @@ public class HAGroupStoreClient implements Closeable {
return Math.max(0, remainingTime);
}
+ // ========== Legacy /phoenix/ha CRR Sync ==========
+
+ /**
+ * Derives the combined CRR from local + peer records and CAS-writes it to
{@code /phoenix/ha}.
+ * CAS losses are logged and skipped; the next consistentHA cache event or
periodic cycle
+ * reconverges.
+ */
+ private void syncLegacyCRRIfRoleChanged() {
+ if (!legacyCrrSyncEnabled || !isHealthy) {
+ return;
+ }
+ // Snapshot mutable resources up front so a concurrent close() can't null
them mid-method
+ // and trigger NPEs / writes through a torn-down Curator client.
+ final PhoenixHAAdmin admin = this.legacyHaAdmin;
+ final NodeCache cache = this.legacyCrrNodeCache;
+ if (admin == null || cache == null) {
+ return;
+ }
+ try {
+ HAGroupStoreRecord local = getHAGroupStoreRecord();
+ if (local == null) {
+ LOGGER.debug("Skipping legacy CRR sync for HA group {}: no local
consistentHA record",
+ haGroupName);
+ return;
+ }
+ // Wait for peer URL before building the desired CRR (ctor NPEs on null
url2).
+ if (StringUtils.isBlank(local.getPeerZKUrl())) {
+ LOGGER.debug("Skipping legacy CRR sync for HA group {}: peer ZK URL is
blank", haGroupName);
+ return;
+ }
+ HAGroupStoreRecord peer = getHAGroupStoreRecordFromPeer();
+ // NodeCache is eventually consistent; on apparent absence, fall back to
an authoritative
+ // ZK read so the equality check and CAS both see consistent state.
+ Pair<ClusterRoleRecord, Stat> snapshot = readLegacyCrrSnapshot(cache);
+ if (snapshot.getRight() == null) {
+ snapshot = admin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+ }
+ ClusterRoleRecord existing = snapshot.getLeft();
+ Stat existingStat = snapshot.getRight();
+ if (!shouldWriteLegacyCrr(existing)) {
+ return;
+ }
+ ClusterRoleRecord desired = buildDesiredLegacyCrr(local, peer, existing);
+ if (desired.isLogicallyEqualIgnoringVersionAndRegistry(existing)) {
+ LOGGER.debug("Legacy CRR for HA group {} already up to date at version
{}", haGroupName,
+ existing.getVersion());
+ return;
+ }
+ try {
+ if (existingStat == null) {
+ admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, /* ignored */ 0);
+ } else {
+ admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION,
existingStat.getVersion());
+ }
+ LOGGER.info("Synced legacy CRR for HA group {} (version {} -> {})",
haGroupName,
+ existing != null ? existing.getVersion() : -1L,
desired.getVersion());
+ } catch (StaleClusterRoleRecordVersionException stale) {
+ // CAS lost (another RS won); next event/periodic cycle reconverges.
DEBUG: N-1 RSes
+ // log this per transition.
+ LOGGER.debug("Legacy CRR CAS lost for HA group {} at expected stat
version {}", haGroupName,
+ existingStat != null ? existingStat.getVersion() : -1);
+ }
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Legacy CRR sync failed for HA group {}; will be retried by next
event/periodic cycle",
+ haGroupName, e);
+ }
+ }
+
+ /**
+ * Policy gate before issuing a CAS write to the legacy CRR. Returns {@code
false} when the
+ * existing record must not be overwritten by this client.
+ */
+ private boolean shouldWriteLegacyCrr(ClusterRoleRecord existing) {
+ // Refuse to overwrite a non-ZK (admin-managed RPC) legacy CRR; live
readers use its
+ // registryType to build connection strings, so swapping form would break
them.
+ if (existing != null && existing.getRegistryType() != RegistryType.ZK) {
+ LOGGER.warn("Skipping legacy CRR sync for HA group {}: existing
registryType={} "
+ + "(requires admin migration to ZK form)", haGroupName,
existing.getRegistryType());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Builds the desired legacy CRR, always stamped {@link RegistryType#ZK}.
When the local client's
+ * peer view is unavailable, preserves the {@code existing} record's peer
role rather than
+ * downgrading it to {@link ClusterRole#UNKNOWN} — another RS with peer
visibility would otherwise
+ * keep flipping it back, and the equality check naturally short-circuits
when no other field
+ * changed.
+ * <p>
+ * Look up the preserved role by the peer URL (not by {@code getRole2()})
since
+ * {@link ClusterRoleRecord} canonicalizes {@code url1}/{@code url2} by
alphabetical order; the
+ * peer URL may end up in either slot depending on lexical comparison.
+ */
+ private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local,
HAGroupStoreRecord peer,
+ ClusterRoleRecord existing) {
+ final ClusterRole role2;
+ if (peer != null) {
+ role2 = peer.getClusterRole();
+ } else if (existing != null) {
+ role2 = existing.getRole(JDBCUtil.formatUrl(local.getPeerZKUrl(),
RegistryType.ZK));
+ } else {
+ role2 = ClusterRole.UNKNOWN;
+ }
+ long peerAdminVersion = (peer != null) ? peer.getAdminCRRVersion() : 0L;
+ long baseVersion = Math.max(existing != null ? existing.getVersion() : 0L,
+ Math.max(local.getAdminCRRVersion(), peerAdminVersion));
+ return new ClusterRoleRecord(haGroupName,
HighAvailabilityPolicy.valueOf(local.getPolicy()),
+ RegistryType.ZK, this.zkUrl, local.getClusterRole(),
local.getPeerZKUrl(), role2,
+ baseVersion + 1);
+ }
+
+ /**
+ * NodeCache snapshot of the legacy CRR. {@code (null, null)} on cache miss;
callers must confirm
+ * absence with an authoritative ZK read since the cache is eventually
consistent. Caller passes
+ * the cache from a local snapshot, never the mutable field (which {@link
#close()} may null at
+ * any time).
+ */
+ private Pair<ClusterRoleRecord, Stat> readLegacyCrrSnapshot(NodeCache cache)
{
+ ChildData current = cache.getCurrentData();
+ if (current == null) {
+ return Pair.of(null, null);
+ }
+ ClusterRoleRecord record =
ClusterRoleRecord.fromJson(current.getData()).orElse(null);
+ return Pair.of(record, current.getStat());
+ }
+
+ /**
+ * Initialize legacy {@code /phoenix/ha} sync: admin handle, NodeCache,
single-thread executor, an
+ * off-lock initial sync, and the periodic reconciler. Called only when the
feature is enabled and
+ * the client is healthy.
+ */
+ private void setupLegacyCrrSync() throws Exception {
+ this.legacyHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf,
PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE);
+ this.legacyCrrNodeCache = new NodeCache(this.legacyHaAdmin.getCurator(),
toPath(haGroupName));
+ // Async start; rebuild() below warms the cache off-lock.
+ this.legacyCrrNodeCache.start(false);
+ this.legacyCrrSyncExecutor = Executors.newSingleThreadScheduledExecutor(r
-> {
+ Thread t = new Thread(r, "HAGroupStoreClient-LegacyCRRSync-" +
haGroupName);
+ t.setDaemon(true);
+ return t;
+ });
+ // Warm NodeCache and run initial sync off-thread so neither blocks the
static
+ // HAGroupStoreClient.class monitor held by getInstanceForZkUrl on ZK/JDBC
I/O. The sync's
+ // empty-snapshot fallback handles the race where it runs before rebuild()
lands.
+ final NodeCache cacheRef = this.legacyCrrNodeCache;
+ this.legacyCrrSyncExecutor.execute(() -> {
+ try {
+ cacheRef.rebuild();
+ } catch (Exception e) {
+ LOGGER.debug("Legacy CRR cache rebuild failed for HA group {}",
haGroupName, e);
+ }
+ });
+ this.legacyCrrSyncExecutor.execute(this::syncLegacyCRRIfRoleChanged);
+ startLegacyCrrReconciliation();
+ }
+
+ /** Schedules the periodic reconciler; no-op when {@code intervalSec <= 0}.
*/
+ private void startLegacyCrrReconciliation() {
+ long intervalSec =
conf.getLong(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS,
+ DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS);
+ if (intervalSec <= 0) {
+ LOGGER.info("Legacy CRR periodic reconciliation disabled (interval={}s)
for HA group {}",
+ intervalSec, haGroupName);
+ return;
+ }
+ long jitterSec =
+ ThreadLocalRandom.current().nextLong(0,
LEGACY_CRR_SYNC_JOB_MAX_JITTER_SECONDS);
+ LOGGER.info("Starting legacy CRR reconciliation for HA group {} with
initial delay {}s, "
+ + "then every {}s", haGroupName, jitterSec, intervalSec);
+ legacyCrrSyncExecutor.scheduleAtFixedRate(() -> {
+ try {
+ syncLegacyCRRIfRoleChanged();
+ } catch (Throwable t) {
+ LOGGER.warn("Periodic legacy CRR reconciliation failed for HA group
{}", haGroupName, t);
+ }
+ }, jitterSec, intervalSec, TimeUnit.SECONDS);
+ }
+
// ========== HA Group State Change Subscription Methods ==========
/**
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java
index f0fa718952..0eaa35e163 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java
@@ -39,6 +39,7 @@ import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException;
import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException;
import org.apache.phoenix.util.JDBCUtil;
import org.apache.zookeeper.CreateMode;
@@ -83,6 +84,25 @@ public class PhoenixHAAdmin implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(PhoenixHAAdmin.class);
+ /** ZK's wildcard for {@code setData()/delete()}: bypasses version check. */
+ static final int ZK_MATCH_ANY_VERSION = -1;
+
+ /**
+ * Write mode for {@link #createOrUpdateClusterRoleRecordWithCAS}. The
accompanying
+ * {@code expectedStatVersion} argument is interpreted only for {@link
#CAS_WITH_VERSION}.
+ * <p>
+ * Phoenix-internal. Not part of any public API contract — values may be
added or renamed without
+ * notice. Do not switch exhaustively from outside this module.
+ */
+ public enum LegacyCrrWriteMode {
+ /** Create the znode; no prior version expected. */
+ CREATE_NEW,
+ /** Unconditional overwrite (no CAS). For operator/migration tooling only.
*/
+ FORCE_OVERWRITE,
+ /** CAS update; {@code expectedStatVersion} must be {@code >= 0}. */
+ CAS_WITH_VERSION
+ }
+
/** The fully qualified ZK URL for an HBase cluster in format
host:port:/hbase */
private final String zkUrl;
/** Configuration of this command line tool. */
@@ -524,17 +544,18 @@ public class PhoenixHAAdmin implements Closeable {
}
/**
- * Gets the HAGroupStoreRecord and Stat from ZooKeeper.
+ * Gets the HAGroupStoreRecord and Stat from ZooKeeper. Reads (record, stat)
atomically via
+ * {@code storingStatIn} so the returned stat version always corresponds to
the returned bytes.
* @param haGroupName the HA group name
- * @return a pair of HAGroupStoreRecord and Stat
+ * @return a pair of HAGroupStoreRecord and Stat; both {@code null} if the
znode does not exist
* @throws IOException if any error occurs during the retrieval
*/
public Pair<HAGroupStoreRecord, Stat>
getHAGroupStoreRecordInZooKeeper(String haGroupName)
throws IOException {
try {
- byte[] data = getCurator().getData().forPath(toPath(haGroupName));
+ Stat stat = new Stat();
+ byte[] data =
getCurator().getData().storingStatIn(stat).forPath(toPath(haGroupName));
HAGroupStoreRecord record =
HAGroupStoreRecord.fromJson(data).orElse(null);
- Stat stat = getCurator().checkExists().forPath(toPath(haGroupName));
return Pair.of(record, stat);
} catch (KeeperException.NoNodeException nne) {
LOG.warn("No HAGroupStoreRecord for HA group {} in ZK", haGroupName,
nne);
@@ -559,6 +580,71 @@ public class PhoenixHAAdmin implements Closeable {
}
}
+ // ----- Legacy /phoenix/ha ClusterRoleRecord sync helpers -----
+
+ /**
+ * Atomic read of (record, stat) on the legacy CRR znode. Returns {@code
(null, null)} if the
+ * znode does not exist.
+ */
+ public Pair<ClusterRoleRecord, Stat>
getClusterRoleRecordAndStatInZooKeeper(String haGroupName)
+ throws IOException {
+ try {
+ Stat stat = new Stat();
+ byte[] data =
getCurator().getData().storingStatIn(stat).forPath(toPath(haGroupName));
+ ClusterRoleRecord record = ClusterRoleRecord.fromJson(data).orElse(null);
+ return Pair.of(record, stat);
+ } catch (KeeperException.NoNodeException nne) {
+ return Pair.of(null, null);
+ } catch (Exception e) {
+ LOG.error("Failed to get ClusterRoleRecord for HA group {}",
haGroupName, e);
+ throw new IOException("Failed to get ClusterRoleRecord for HA group " +
haGroupName, e);
+ }
+ }
+
+ /**
+ * Writes {@code newRecord} per {@code mode}. {@code expectedStatVersion} is
used only for
+ * {@link LegacyCrrWriteMode#CAS_WITH_VERSION} (must be {@code >= 0}). Both
BadVersion and
+ * NodeExists surface as {@link StaleClusterRoleRecordVersionException}.
+ */
+ public void createOrUpdateClusterRoleRecordWithCAS(String haGroupName,
+ ClusterRoleRecord newRecord, LegacyCrrWriteMode mode, int
expectedStatVersion)
+ throws IOException, StaleClusterRoleRecordVersionException {
+ Preconditions.checkNotNull(mode, "mode");
+ if (mode == LegacyCrrWriteMode.CAS_WITH_VERSION) {
+ Preconditions.checkArgument(expectedStatVersion >= 0,
+ "CAS_WITH_VERSION requires expectedStatVersion >= 0; got " +
expectedStatVersion);
+ }
+ try {
+ byte[] data = ClusterRoleRecord.toJson(newRecord);
+ switch (mode) {
+ case CREATE_NEW:
+
getCurator().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+ .forPath(toPath(haGroupName), data);
+ break;
+ case FORCE_OVERWRITE:
+
getCurator().setData().withVersion(ZK_MATCH_ANY_VERSION).forPath(toPath(haGroupName),
+ data);
+ break;
+ case CAS_WITH_VERSION:
+
getCurator().setData().withVersion(expectedStatVersion).forPath(toPath(haGroupName),
+ data);
+ break;
+ default:
+ throw new IllegalStateException("Unhandled LegacyCrrWriteMode: " +
mode);
+ }
+ } catch (KeeperException.BadVersionException e) {
+ throw new StaleClusterRoleRecordVersionException(
+ "CAS failed for HA group " + haGroupName + " at expectedStatVersion "
+ expectedStatVersion,
+ e);
+ } catch (KeeperException.NodeExistsException e) {
+ throw new StaleClusterRoleRecordVersionException(
+ "Create failed for HA group " + haGroupName + ": node already exists",
e);
+ } catch (Exception e) {
+ LOG.error("Failed to write ClusterRoleRecord for HA group {}",
haGroupName, e);
+ throw new IOException("Failed to write ClusterRoleRecord for HA group "
+ haGroupName, e);
+ }
+ }
+
public String getZkUrl() {
return zkUrl;
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 72075853aa..42514d7329 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -647,6 +647,16 @@ public interface QueryServices extends SQLCloseable {
// HA Group Store sync job interval in seconds
String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS =
"phoenix.ha.group.store.sync.interval.seconds";
+ // "CRR" = Cluster Role Record. Master switch for syncing the legacy
/phoenix/ha cluster
+ // role record from /phoenix/consistentHA. When false, no legacy znode is
read, written, or
+ // deleted by HAGroupStoreClient.
+ String PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED =
"phoenix.ha.legacy.crr.sync.enabled";
+
+ // Periodic reconciliation interval for the legacy /phoenix/ha cluster role
record sync, in
+ // seconds. 0 disables the periodic loop only; event-driven sync still runs.
+ String PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS =
+ "phoenix.ha.legacy.crr.reconciliation.interval.seconds";
+
String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
"phoenix.replication.log.rotation.time.ms";
/**
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 339b6763a4..3e99b9fff6 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -92,6 +92,8 @@ import static
org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_AT
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
import static org.apache.phoenix.query.QueryServices.PHOENIX_ACLS_ENABLED;
+import static
org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS;
+import static
org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED;
import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB;
@@ -517,6 +519,13 @@ public class QueryServicesOptions {
// Default HA Group Store sync job interval in seconds (15 minutes = 900
seconds)
public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900;
+ // Legacy /phoenix/ha CRR sync is opt-in (default off).
+ public static final boolean DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED =
false;
+
+ // Periodic reconciliation interval for legacy /phoenix/ha CRR sync, in
seconds.
+ // 0 disables the periodic loop only; event-driven sync still runs.
+ public static final long
DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS = 60L;
+
public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 *
1000L;
private final Configuration config;
@@ -643,7 +652,10 @@ public class QueryServicesOptions {
.setIfUnset(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS,
DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS)
.setIfUnset(HA_GROUP_STORE_CLIENT_PREWARM_ENABLED,
- DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED);
+ DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED)
+ .setIfUnset(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED,
DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED)
+ .setIfUnset(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS,
+ DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS);
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index edb38da970..1f57187a2c 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -4738,9 +4738,9 @@ public class MetaDataClient {
/**
* To check if TTL is defined at any of the child below we are
checking it at
* {@link
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List,
ColumnMutator, int, PTable, PTable, boolean)}
- * level where in function
- * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[],
byte[], byte[], List, int)}
- * we are already traversing through allDescendantViews.
+ * level where in function {@link
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
+ * validateIfMutationAllowedOnParent(PTable, List, PTableType, long,
byte[], byte[],
+ * byte[], List, int)} we are already traversing through
allDescendantViews.
*/
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index ba66787674..8997fc2bc4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -18,9 +18,12 @@
package org.apache.phoenix.jdbc;
import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE;
+import static
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static
org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS;
+import static
org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED;
import static
org.apache.phoenix.replication.reader.ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
@@ -34,6 +37,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -47,13 +51,16 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
+import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException;
import org.apache.phoenix.util.HAGroupStoreTestUtil;
+import org.apache.phoenix.util.JDBCUtil;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
@@ -72,6 +79,8 @@ public class HAGroupStoreClientIT extends HABaseIT {
private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 5000L;
private PhoenixHAAdmin haAdmin;
private PhoenixHAAdmin peerHaAdmin;
+ // Admin on the legacy /phoenix/ha namespace; used to inspect/seed/corrupt
the legacy znode.
+ private PhoenixHAAdmin legacyHaAdmin;
private String zkUrl;
private String peerZKUrl;
private String masterUrl;
@@ -95,8 +104,11 @@ public class HAGroupStoreClientIT extends HABaseIT {
ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE);
peerHaAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE);
+ legacyHaAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(),
+ PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE);
haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
+
legacyHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
zkUrl = getLocalZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration());
// Clean existing records in system table
List<String> haGroupNames = HAGroupStoreClient.getHAGroupNames(zkUrl);
@@ -117,8 +129,10 @@ public class HAGroupStoreClientIT extends HABaseIT {
public void after() throws Exception {
haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
+
legacyHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
haAdmin.close();
peerHaAdmin.close();
+ legacyHaAdmin.close();
}
@Test
@@ -752,7 +766,6 @@ public class HAGroupStoreClientIT extends HABaseIT {
@Test
public void testSetHAGroupStatusIfNeededWithUnhealthyClient() throws
Exception {
String haGroupName = testName.getMethodName();
-
HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient
.getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(),
haGroupName, zkUrl);
@@ -1203,4 +1216,547 @@ public class HAGroupStoreClientIT extends HABaseIT {
assertTrue("Sync executor should be shutdown after close",
syncExecutor.isShutdown());
}
}
+
+ //
============================================================================================
+ // Legacy /phoenix/ha CRR sync tests
+ // Verify feature-flag gating, derivation, monotonic version, registry-type
preservation,
+ // deletion mirroring, and short-circuit behavior of HAGroupStoreClient's
legacy sync path.
+ //
============================================================================================
+
+ @Test
+ public void testLegacyCrrSyncFeatureOffByDefault_NoLegacyZnodeWritten()
throws Exception {
+ String haGroupName = testName.getMethodName();
+ Configuration conf = legacyCrrConf(/* legacyEnabled */ false, /*
periodicSec */ 60);
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ Pair<ClusterRoleRecord, Stat> legacy = readLegacyCrr(haGroupName);
+ assertNull("Legacy CRR must not exist when feature is off",
legacy.getLeft());
+ assertNull("Legacy znode stat must be null when feature is off",
legacy.getRight());
+ }
+
+ @Test
+ public void
testLegacyCrrSyncFeatureOn_InitialSyncCreatesZkRegistryLegacyZnode()
+ throws Exception {
+ String haGroupName = testName.getMethodName();
+ Configuration conf = legacyCrrConf(true, 60);
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+
+ Pair<ClusterRoleRecord, Stat> legacy = awaitLegacyCrrPresent(haGroupName);
+ ClusterRoleRecord crr = legacy.getLeft();
+ assertEquals("Legacy CRR must use ZK registry type for backward
compatibility",
+ ClusterRoleRecord.RegistryType.ZK, crr.getRegistryType());
+ assertEquals(haGroupName, crr.getHaGroupName());
+ assertEquals(HighAvailabilityPolicy.FAILOVER, crr.getPolicy());
+ // Local cluster role is ACTIVE per System Table seed.
+ assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE,
+ crr.getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+ assertTrue("CRR version must be > 0 after initial sync", crr.getVersion()
> 0);
+ }
+
+ @Test
+ public void testLegacyCrrSyncRoleChangePropagatesAndIsNewerThanWorks()
throws Exception {
+ String haGroupName = testName.getMethodName();
+ Configuration conf = legacyCrrConf(true, 60);
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+
+ ClusterRoleRecord initial = awaitLegacyCrrPresent(haGroupName).getLeft();
+ long initialVersion = initial.getVersion();
+
+ // ACTIVE_IN_SYNC -> ACTIVE_IN_SYNC_TO_STANDBY (role change ACTIVE ->
ACTIVE_TO_STANDBY).
+
client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
+ Pair<ClusterRoleRecord, Stat> updated = awaitLegacyCrrRole(haGroupName,
ClusterType.LOCAL,
+ ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY);
+ ClusterRoleRecord updatedCRR = updated.getLeft();
+ assertTrue("Version must monotonically increase after role change",
+ updatedCRR.getVersion() > initialVersion);
+ assertTrue("isNewerThan must return true for the updated record",
+ updatedCRR.isNewerThan(initial));
+ assertEquals(ClusterRoleRecord.RegistryType.ZK,
updatedCRR.getRegistryType());
+ }
+
+ @Test
+ public void testLegacyCrrSyncStateOnlyChangeDoesNotRewriteLegacy() throws
Exception {
+ String haGroupName = testName.getMethodName();
+ Configuration conf = legacyCrrConf(true, 60);
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+
+ Pair<ClusterRoleRecord, Stat> initial = awaitLegacyCrrPresent(haGroupName);
+ int initialZkVersion = initial.getRight().getVersion();
+ long initialCrrVersion = initial.getLeft().getVersion();
+
+ // ACTIVE_IN_SYNC -> ACTIVE_NOT_IN_SYNC: ClusterRole stays ACTIVE.
+
client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ Pair<ClusterRoleRecord, Stat> after = readLegacyCrr(haGroupName);
+ assertNotNull(after.getLeft());
+ assertEquals("Legacy CRR ZK stat version must not change on state-only
transitions",
+ initialZkVersion, after.getRight().getVersion());
+ assertEquals("Legacy CRR logical version must not change on state-only
transitions",
+ initialCrrVersion, after.getLeft().getVersion());
+ }
+
+ /** LOCAL CHILD_REMOVED on consistentHA does not delete the legacy znode. */
+ @Test
+ public void testLegacyCrrSyncLocalChildRemovedDoesNotDeleteLegacy() throws
Exception {
+ String haGroupName = testName.getMethodName();
+ Configuration conf = legacyCrrConf(true, 60);
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+ Pair<ClusterRoleRecord, Stat> initial = awaitLegacyCrrPresent(haGroupName);
+ long initialVersion = initial.getLeft().getVersion();
+
+ haAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
+
+ // Wait long enough for any potential event-driven delete to have fired.
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ Pair<ClusterRoleRecord, Stat> after = readLegacyCrr(haGroupName);
+ assertNotNull("Legacy znode must NOT be deleted on LOCAL CHILD_REMOVED",
after.getLeft());
+ assertTrue("Legacy CRR version must not regress after LOCAL CHILD_REMOVED",
+ after.getLeft().getVersion() >= initialVersion);
+ }
+
+ @Test
+ public void testLegacyCrrSyncPeriodicDisabledStillSyncsViaEvents() throws
Exception {
+ String haGroupName = testName.getMethodName();
+ Configuration conf = legacyCrrConf(true, 0); // periodic disabled
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+ ClusterRoleRecord initial = awaitLegacyCrrPresent(haGroupName).getLeft();
+ assertEquals("Initial local role should be ACTIVE per @Before seed",
+ ClusterRoleRecord.ClusterRole.ACTIVE,
initial.getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+ assertEquals("Initial registry type must be ZK",
ClusterRoleRecord.RegistryType.ZK,
+ initial.getRegistryType());
+
+
client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
+ ClusterRoleRecord updated = awaitLegacyCrrRole(haGroupName,
ClusterType.LOCAL,
+ ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).getLeft();
+ assertTrue("Updated version must monotonically advance past the initial
version",
+ updated.getVersion() > initial.getVersion());
+ assertTrue("isNewerThan must return true for the post-event record",
+ updated.isNewerThan(initial));
+ assertEquals("Registry type must remain ZK after an event-driven sync",
+ ClusterRoleRecord.RegistryType.ZK, updated.getRegistryType());
+ }
+
+ /** PEER CHILD_REMOVED on consistentHA does not delete the legacy znode. */
+ @Test
+ public void testLegacyCrrSyncPeerChildRemovedDoesNotDeleteLegacy() throws
Exception {
+ String haGroupName = testName.getMethodName();
+ // Seed a peer record so that PEER cache initializes and PEER
CHILD_REMOVED can fire later.
+ HAGroupStoreRecord peerRecord =
+ new HAGroupStoreRecord("v1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
+ HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl,
this.peerMasterUrl, this.masterUrl,
+ CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L);
+ createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName,
peerRecord);
+
+ Configuration conf = legacyCrrConf(true, 0); // periodic disabled to
isolate event behavior
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+
+ Pair<ClusterRoleRecord, Stat> initial = awaitLegacyCrrPresent(haGroupName);
+ long initialCrrVersion = initial.getLeft().getVersion();
+
+ peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ Pair<ClusterRoleRecord, Stat> after = readLegacyCrr(haGroupName);
+ assertNotNull("Legacy znode must NOT be deleted on PEER CHILD_REMOVED",
after.getLeft());
+ assertTrue("Legacy CRR version must not regress after PEER CHILD_REMOVED",
+ after.getLeft().getVersion() >= initialCrrVersion);
+ }
+
+ /**
+ * Each {@link PhoenixHAAdmin.LegacyCrrWriteMode}: error mapping (BadVersion
+ NodeExists ->
+ * {@link StaleClusterRoleRecordVersionException}), unconditional
FORCE_OVERWRITE, and
+ * CAS_WITH_VERSION rejecting negative versions. Sequential: ZK serializes
versioned writes
+ * server-side, so the client retry path is identical to a real race.
+ */
+ @Test
+ public void testLegacyCrrCasErrorMappingAndModeDispatch() throws Exception {
+ String haGroupName = testName.getMethodName();
+
+ // Create.
+ ClusterRoleRecord initial = new ClusterRoleRecord(haGroupName,
HighAvailabilityPolicy.FAILOVER,
+ ClusterRoleRecord.RegistryType.ZK, this.zkUrl,
ClusterRoleRecord.ClusterRole.ACTIVE,
+ this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, initial,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0);
+
+ Pair<ClusterRoleRecord, Stat> existing =
+ legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+ assertNotNull(existing.getLeft());
+ int sharedVersion = existing.getRight().getVersion();
+
+ // CAS winner.
+ ClusterRoleRecord writerA = new ClusterRoleRecord(haGroupName,
HighAvailabilityPolicy.FAILOVER,
+ ClusterRoleRecord.RegistryType.ZK, this.zkUrl,
+ ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, this.peerZKUrl,
+ ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+ legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, writerA,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, sharedVersion);
+
+ // CAS loser: same expected version -> BadVersion ->
StaleClusterRoleRecordVersionException.
+ ClusterRoleRecord writerB = new ClusterRoleRecord(haGroupName,
HighAvailabilityPolicy.FAILOVER,
+ ClusterRoleRecord.RegistryType.ZK, this.zkUrl,
ClusterRoleRecord.ClusterRole.STANDBY,
+ this.peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, 2L);
+ assertThrows(StaleClusterRoleRecordVersionException.class,
+ () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName,
writerB,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, sharedVersion));
+
+ Pair<ClusterRoleRecord, Stat> winner =
+ legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+ assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ winner.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+ assertTrue(winner.getRight().getVersion() > sharedVersion);
+
+ // CREATE_NEW on an existing znode -> NodeExists ->
StaleClusterRoleRecordVersionException.
+ ClusterRoleRecord raceCreate =
+ new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER,
+ ClusterRoleRecord.RegistryType.ZK, this.zkUrl,
ClusterRoleRecord.ClusterRole.STANDBY,
+ this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 3L);
+ assertThrows(StaleClusterRoleRecordVersionException.class,
+ () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName,
raceCreate,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0));
+
+ // FORCE_OVERWRITE bypasses CAS and bumps the stat version.
+ Stat statBeforeOverwrite =
+
legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName).getRight();
+ ClusterRoleRecord overwrite =
+ new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER,
+ ClusterRoleRecord.RegistryType.ZK, this.zkUrl,
ClusterRoleRecord.ClusterRole.STANDBY,
+ this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 4L);
+ legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName,
overwrite,
+ PhoenixHAAdmin.LegacyCrrWriteMode.FORCE_OVERWRITE, 0);
+ Pair<ClusterRoleRecord, Stat> afterOverwrite =
+ legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+ assertEquals(ClusterRoleRecord.ClusterRole.STANDBY,
+ afterOverwrite.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+ assertTrue(afterOverwrite.getRight().getVersion() >
statBeforeOverwrite.getVersion());
+
+ // CAS_WITH_VERSION rejects negative expectedStatVersion before any ZK
call.
+ ClusterRoleRecord illegalRecord =
+ new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER,
+ ClusterRoleRecord.RegistryType.ZK, this.zkUrl,
ClusterRoleRecord.ClusterRole.OFFLINE,
+ this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L);
+ assertThrows(IllegalArgumentException.class,
+ () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName,
illegalRecord,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, -1));
+ assertThrows(IllegalArgumentException.class,
+ () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName,
illegalRecord,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION,
Integer.MIN_VALUE));
+ }
+
+ /** Peer-side role flip propagates to role2 in the local legacy CRR. */
+ @Test
+ public void testLegacyCrrSyncPeerRoleFlipUpdatesLegacyRole2() throws
Exception {
+ String haGroupName = testName.getMethodName();
+ // Seed peer with STANDBY before client starts so the initial sync sees
role2=STANDBY.
+ HAGroupStoreRecord peerStandby =
+ new HAGroupStoreRecord("v1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
+ HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl,
this.peerMasterUrl, this.masterUrl,
+ CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L);
+ createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName,
peerStandby);
+
+ Configuration conf = legacyCrrConf(true, 0); // periodic disabled to
isolate event-driven path
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+ awaitLegacyCrrRole(haGroupName, ClusterType.PEER,
ClusterRoleRecord.ClusterRole.STANDBY);
+
+ // Flip the peer record to a state whose cluster role is ACTIVE_TO_STANDBY.
+ HAGroupStoreRecord peerFlipped = new HAGroupStoreRecord("v1.0",
haGroupName,
+ HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, 0L,
+ HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl,
this.peerMasterUrl, this.masterUrl,
+ CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L);
+ createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName,
peerFlipped);
+
+ Pair<ClusterRoleRecord, Stat> after = awaitLegacyCrrRole(haGroupName,
ClusterType.PEER,
+ ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY);
+ assertEquals("Registry type must remain ZK after a peer-driven role flip",
+ ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType());
+ }
+
+ /** Absent peer record yields role2=UNKNOWN; converges when the peer record
appears. */
+ @Test
+ public void testLegacyCrrSyncPeerAbsentYieldsUnknownAndConvergesOnRecovery()
throws Exception {
+ String haGroupName = testName.getMethodName();
+ // No peer record seeded: peer cache is empty so
getHAGroupStoreRecordFromPeer() returns null
+ // and role2 falls through to UNKNOWN.
+ Configuration conf = legacyCrrConf(true, 0);
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+ Pair<ClusterRoleRecord, Stat> initial =
+ awaitLegacyCrrRole(haGroupName, ClusterType.PEER,
ClusterRoleRecord.ClusterRole.UNKNOWN);
+ long initialVersion = initial.getLeft().getVersion();
+
+ // Peer "recovers" by writing its consistentHA record. The PEER
CHILD_ADDED event triggers
+ // the legacy sync to update role2.
+ HAGroupStoreRecord peerRecord =
+ new HAGroupStoreRecord("v1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
+ HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl,
this.peerMasterUrl, this.masterUrl,
+ CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L);
+ createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName,
peerRecord);
+
+ Pair<ClusterRoleRecord, Stat> recovered =
+ awaitLegacyCrrRole(haGroupName, ClusterType.PEER,
ClusterRoleRecord.ClusterRole.STANDBY);
+ assertTrue("Version must bump when role2 transitions UNKNOWN -> STANDBY",
+ recovered.getLeft().getVersion() > initialVersion);
+ }
+
+ /** registryType stays ZK across multiple sync cycles (never reverts to
RPC). */
+ @Test
+ public void testLegacyCrrSyncRegistryTypePreservedAcrossMultipleCycles()
throws Exception {
+ String haGroupName = testName.getMethodName();
+ Configuration conf = legacyCrrConf(true, 0);
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+
+ Pair<ClusterRoleRecord, Stat> initial = awaitLegacyCrrPresent(haGroupName);
+ assertEquals(ClusterRoleRecord.RegistryType.ZK,
initial.getLeft().getRegistryType());
+ assertEquals("Initial local role should be ACTIVE per @Before seed",
+ ClusterRoleRecord.ClusterRole.ACTIVE,
+ initial.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+ long lastVersion = initial.getLeft().getVersion();
+
+ // Drive a sequence of distinct peer states; each event drives a sync that
rewrites the
+ // legacy znode (or short-circuits if logically equal). Direct ZK writes
intentionally
+ // bypass setHAGroupStatusIfNeeded's transition guard.
+ HAGroupStoreRecord.HAGroupState[] cycle =
+ new HAGroupStoreRecord.HAGroupState[] {
HAGroupStoreRecord.HAGroupState.STANDBY,
+ HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
+ HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE,
HAGroupStoreRecord.HAGroupState.OFFLINE,
+ HAGroupStoreRecord.HAGroupState.STANDBY };
+ for (HAGroupStoreRecord.HAGroupState state : cycle) {
+ HAGroupStoreRecord peer = new HAGroupStoreRecord("v1.0", haGroupName,
state, 0L,
+ HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl,
this.peerMasterUrl, this.masterUrl,
+ CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L);
+ createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName,
peer);
+ Pair<ClusterRoleRecord, Stat> after =
+ awaitLegacyCrrRole(haGroupName, ClusterType.PEER,
state.getClusterRole());
+ assertEquals(
+ "Local role must remain ACTIVE across peer-driven cycles (peer state="
+ state + ")",
+ ClusterRoleRecord.ClusterRole.ACTIVE,
+ after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+ assertEquals("Registry type must remain ZK after a sync cycle (peer
state=" + state + ")",
+ ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType());
+ assertTrue(
+ "Logical version must monotonically increase across distinct sync
cycles (peer state="
+ + state + ")",
+ after.getLeft().getVersion() > lastVersion);
+ lastVersion = after.getLeft().getVersion();
+ }
+ }
+
+ /** Periodic loop repairs an external divergence with no consistentHA event.
*/
+ @Test
+ public void testLegacyCrrSyncPeriodicReconciliationRecoversAfterDivergence()
throws Exception {
+ String haGroupName = testName.getMethodName();
+ Configuration conf = legacyCrrConf(true, 2); // 2s interval; jitter is
0-30s on first run
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+ Pair<ClusterRoleRecord, Stat> initial = awaitLegacyCrrPresent(haGroupName);
+ assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE,
+ initial.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+
+ // Externally corrupt the legacy znode; no consistentHA event fires, so
only the periodic
+ // reconciler can recover.
+ ClusterRoleRecord corrupt = new ClusterRoleRecord(haGroupName,
HighAvailabilityPolicy.FAILOVER,
+ ClusterRoleRecord.RegistryType.ZK, this.zkUrl,
ClusterRoleRecord.ClusterRole.STANDBY,
+ this.peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE,
initial.getLeft().getVersion() + 10);
+ legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, corrupt,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION,
initial.getRight().getVersion());
+ Pair<ClusterRoleRecord, Stat> corrupted = readLegacyCrr(haGroupName);
+ assertEquals(ClusterRoleRecord.ClusterRole.STANDBY,
+ corrupted.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+
+ // Worst-case wait: jitter up to 30s + 2s interval; allow 40s.
+ long deadline = System.currentTimeMillis() + 40_000L;
+ Pair<ClusterRoleRecord, Stat> after = readLegacyCrr(haGroupName);
+ while (
+ (after.getLeft() == null ||
after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))
+ != ClusterRoleRecord.ClusterRole.ACTIVE)
+ && System.currentTimeMillis() < deadline
+ ) {
+ Thread.sleep(500);
+ after = readLegacyCrr(haGroupName);
+ }
+ assertNotNull(after.getLeft());
+ assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE,
+ after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+ assertTrue(after.getLeft().getVersion() >
corrupted.getLeft().getVersion());
+ assertEquals(ClusterRoleRecord.RegistryType.ZK,
after.getLeft().getRegistryType());
+ }
+
+ /**
+ * Peer view absent: client preserves the pre-seeded {@code role2} rather
than downgrading it to
+ * UNKNOWN. Information from a prior write is more authoritative than a
transient gap in the local
+ * peer cache; another RS with peer visibility (or this client once peer
recovers) will overwrite
+ * when there is real news.
+ */
+ @Test
+ public void testLegacyCrrSyncPreservesPreSeededRole2WhenPeerMissing() throws
Exception {
+ String haGroupName = testName.getMethodName();
+ // Pre-seed role2=OFFLINE; do NOT create a peer consistentHA record.
+ ClusterRoleRecord preSeed = new ClusterRoleRecord(haGroupName,
HighAvailabilityPolicy.FAILOVER,
+ ClusterRoleRecord.RegistryType.ZK, this.zkUrl,
ClusterRoleRecord.ClusterRole.ACTIVE,
+ this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L);
+ legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, preSeed,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0);
+ Pair<ClusterRoleRecord, Stat> seeded = readLegacyCrr(haGroupName);
+ assertNotNull(seeded.getLeft());
+ int seededStatVersion = seeded.getRight().getVersion();
+
+ Configuration conf = legacyCrrConf(true, 0); // periodic disabled; initial
sync only
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+
+ // Allow the initial async sync ample time to run. With peer absent and
role2=OFFLINE
+ // already in the znode, the equality check must short-circuit and the
znode must remain
+ // byte-identical.
+ Thread.sleep(3_000);
+
+ Pair<ClusterRoleRecord, Stat> after = readLegacyCrr(haGroupName);
+ assertNotNull(after.getLeft());
+ assertEquals("Pre-seeded role2 must be preserved when peer view is absent",
+ ClusterRoleRecord.ClusterRole.OFFLINE,
+ after.getLeft().getRole(formattedZkUrlFor(ClusterType.PEER)));
+ assertEquals("Znode must not be rewritten when desired record is logically
equal",
+ seededStatVersion, after.getRight().getVersion());
+ assertEquals(seeded.getLeft().getVersion(), after.getLeft().getVersion());
+ }
+
+ /**
+ * Peer view present: client overwrites a pre-seeded stale {@code role2}
with the live peer state
+ * on the initial sync, bumping the version.
+ */
+ @Test
+ public void testLegacyCrrSyncOverwritesPreSeededRole2WhenPeerPresent()
throws Exception {
+ String haGroupName = testName.getMethodName();
+ // Pre-seed role2=OFFLINE.
+ ClusterRoleRecord preSeed = new ClusterRoleRecord(haGroupName,
HighAvailabilityPolicy.FAILOVER,
+ ClusterRoleRecord.RegistryType.ZK, this.zkUrl,
ClusterRoleRecord.ClusterRole.ACTIVE,
+ this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L);
+ legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, preSeed,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0);
+ Pair<ClusterRoleRecord, Stat> seeded = readLegacyCrr(haGroupName);
+ assertNotNull(seeded.getLeft());
+
+ // Create a peer consistentHA record so the sync can see real peer state.
+ HAGroupStoreRecord peerRecord =
+ new HAGroupStoreRecord("v1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
+ HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl,
this.peerMasterUrl, this.masterUrl,
+ CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L);
+ createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName,
peerRecord);
+
+ Configuration conf = legacyCrrConf(true, 0); // periodic disabled; initial
sync only
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+
+ Pair<ClusterRoleRecord, Stat> after =
+ awaitLegacyCrrRole(haGroupName, ClusterType.PEER,
ClusterRoleRecord.ClusterRole.STANDBY);
+ assertEquals(ClusterRoleRecord.RegistryType.ZK,
after.getLeft().getRegistryType());
+ assertTrue("Version must bump when peer state replaces stale role2",
+ after.getLeft().getVersion() > seeded.getLeft().getVersion());
+ }
+
+ /**
+ * Regression for the apurtell PR-2479 lockout concern. Seeds the legacy
znode with the
+ * pre-PHOENIX-7495 JSON shape ({@code zk1}/{@code zk2} keys, no {@code
registryType}). Strict
+ * Jackson on this build rejects the unknown {@code zk1}/{@code zk2} fields,
so {@code fromJson}
+ * returns empty and the sync path takes the {@code existing=null} + {@code
stat!=null} branch,
+ * CAS-overwriting the bytes with a fresh ZK-registry record.
+ */
+ @Test
+ public void testLegacyCrrSyncMigratesOlderZk1Zk2Record() throws Exception {
+ String haGroupName = testName.getMethodName();
+
+ // \\\\ in the Java literal -> \\ in JSON bytes -> \ in the parsed url
string.
+ String legacyJson = String.format("{\"haGroupName\":\"%s\"," +
"\"policy\":\"FAILOVER\","
+ + "\"zk1\":\"legacy-host-1\\\\:2181::/hbase\"," + "\"role1\":\"ACTIVE\","
+ + "\"zk2\":\"legacy-host-2\\\\:2181::/hbase\"," +
"\"role2\":\"STANDBY\"," + "\"version\":7}",
+ haGroupName);
+
+ // Sanity: this build's strict Jackson cannot decode the pre-PHOENIX-7495
shape.
+ assertFalse("Pre-PHOENIX-7495 JSON must fail to parse on this build",
+
ClusterRoleRecord.fromJson(legacyJson.getBytes(StandardCharsets.UTF_8)).isPresent());
+
+ // Seed the legacy znode with the older bytes, then start the client with
sync on.
+
legacyHaAdmin.getCurator().create().creatingParentsIfNeeded().forPath(toPath(haGroupName),
+ legacyJson.getBytes(StandardCharsets.UTF_8));
+
+ Configuration conf = legacyCrrConf(/* legacyEnabled */ true, /*
periodicSec */ 60);
+ HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
+ assertNotNull(client);
+
+ // Migration: existing=null (unparseable) but stat populated -> CAS
overwrite path.
+ Pair<ClusterRoleRecord, Stat> migrated =
awaitLegacyCrrPresent(haGroupName);
+ ClusterRoleRecord crr = migrated.getLeft();
+
+ assertEquals("Migrated record must carry registryType=ZK",
ClusterRoleRecord.RegistryType.ZK,
+ crr.getRegistryType());
+ assertEquals("Local role must reflect live consistentHA state (seeded
ACTIVE)",
+ ClusterRoleRecord.ClusterRole.ACTIVE,
crr.getRole(formattedZkUrlFor(ClusterType.LOCAL)));
+ // The seeded record's version=7 is lost because the bytes don't parse;
buildDesiredLegacyCrr
+ // sees existing=null and bases the new version on
local.getAdminCRRVersion(). Just assert
+ // monotonic-from-zero.
+ assertTrue("Migrated record must have positive version", crr.getVersion()
>= 1L);
+ // ZK stat reflects an overwrite (CAS at stat.version=0 produced
stat.version=1).
+ assertEquals("ZK stat version must reflect overwrite", 1,
migrated.getRight().getVersion());
+ }
+
+ // ---------- Legacy CRR sync test helpers ----------
+
+ /** Configuration clone with the legacy CRR flag and reconciliation interval
set. */
+ private Configuration legacyCrrConf(boolean legacyEnabled, long periodicSec)
{
+ Configuration src = CLUSTERS.getHBaseCluster1().getConfiguration();
+ Configuration cloned = new Configuration(src);
+ cloned.setBoolean(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, legacyEnabled);
+ cloned.setLong(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS,
periodicSec);
+ return cloned;
+ }
+
+ private Pair<ClusterRoleRecord, Stat> readLegacyCrr(String haGroupName)
throws IOException {
+ return legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+ }
+
+ /** Polls the legacy CRR until {@code condition} matches or the propagation
deadline elapses. */
+ private Pair<ClusterRoleRecord, Stat> awaitLegacyCrr(String haGroupName,
+ Predicate<ClusterRoleRecord> condition, String description) throws
Exception {
+ long deadline = System.currentTimeMillis() +
ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS;
+ Pair<ClusterRoleRecord, Stat> legacy = readLegacyCrr(haGroupName);
+ while (
+ (legacy.getLeft() == null || !condition.test(legacy.getLeft()))
+ && System.currentTimeMillis() < deadline
+ ) {
+ Thread.sleep(100);
+ legacy = readLegacyCrr(haGroupName);
+ }
+ assertNotNull("Legacy znode missing while awaiting: " + description,
legacy.getLeft());
+ assertTrue("Legacy CRR condition not met within timeout: " + description,
+ condition.test(legacy.getLeft()));
+ return legacy;
+ }
+
+ private Pair<ClusterRoleRecord, Stat> awaitLegacyCrrPresent(String
haGroupName) throws Exception {
+ return awaitLegacyCrr(haGroupName, crr -> true, "znode present");
+ }
+
+ /** Polls until the LOCAL or PEER role in the legacy CRR matches {@code
expectedRole}. */
+ private Pair<ClusterRoleRecord, Stat> awaitLegacyCrrRole(String haGroupName,
+ ClusterType clusterType, ClusterRoleRecord.ClusterRole expectedRole)
throws Exception {
+ String url = formattedZkUrlFor(clusterType);
+ return awaitLegacyCrr(haGroupName, crr -> crr.getRole(url) == expectedRole,
+ clusterType + " role == " + expectedRole);
+ }
+
+ /** LOCAL or PEER ZK URL in the canonical ZK-registry form used by the
legacy sync. */
+ private String formattedZkUrlFor(ClusterType clusterType) {
+ String raw = (clusterType == ClusterType.LOCAL) ? zkUrl : peerZKUrl;
+ return JDBCUtil.formatUrl(raw, ClusterRoleRecord.RegistryType.ZK);
+ }
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
index e6e71d86a3..4a2e43a3b8 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
@@ -229,6 +229,110 @@ public class ClusterRoleRecordTest {
assertEquals(ClusterRole.UNKNOWN, role);
}
+ /** JSON without a {@code registryType} field defaults to RPC on
deserialization. */
+ @Test
+ public void testFromJsonWithoutRegistryTypeDefaultsToRpc() throws
IOException {
+ byte[] data = readFile("json/test_role_record_no_registry_type.json");
+ Optional<ClusterRoleRecord> opt = ClusterRoleRecord.fromJson(data);
+ assertTrue(opt.isPresent());
+ ClusterRoleRecord record = opt.get();
+ assertEquals(ClusterRoleRecord.RegistryType.RPC, record.getRegistryType());
+ assertEquals(HighAvailabilityPolicy.FAILOVER, record.getPolicy());
+ assertEquals(7L, record.getVersion());
+ assertEquals(ClusterRole.ACTIVE, record.getRole1());
+ assertEquals(ClusterRole.STANDBY, record.getRole2());
+ }
+
+ /** Explicit {@code registryType=RPC} must round-trip as RPC. */
+ @Test
+ public void testFromJsonExplicitRpcRegistryTypeRoundTrips() throws
IOException {
+ Optional<ClusterRoleRecord> opt =
+
ClusterRoleRecord.fromJson(readFile("json/test_role_record_explicit_rpc.json"));
+ assertTrue(opt.isPresent());
+ assertEquals(ClusterRoleRecord.RegistryType.RPC,
opt.get().getRegistryType());
+ assertEquals(11L, opt.get().getVersion());
+ }
+
+ /** Explicit {@code registryType=ZK} round-trips as ZK. */
+ @Test
+ public void testFromJsonExplicitZkRegistryTypeRoundTrips() throws
IOException {
+ Optional<ClusterRoleRecord> opt =
+
ClusterRoleRecord.fromJson(readFile("json/test_role_record_explicit_zk.json"));
+ assertTrue(opt.isPresent());
+ assertEquals(ClusterRoleRecord.RegistryType.ZK,
opt.get().getRegistryType());
+ assertEquals(13L, opt.get().getVersion());
+ }
+
+ /** Round-trip: ZK-registry CRR -> JSON -> CRR preserves {@code
registryType}. */
+ @Test
+ public void testToFromJsonPreservesZkRegistryTypeAcrossRoundTrip() throws
IOException {
+ ClusterRoleRecord written = new ClusterRoleRecord(testName.getMethodName(),
+ HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK,
"zk1\\:2181::/hbase",
+ ClusterRole.ACTIVE, "zk2\\:2181::/hbase", ClusterRole.STANDBY, 42L);
+ Optional<ClusterRoleRecord> read =
+ ClusterRoleRecord.fromJson(ClusterRoleRecord.toJson(written));
+ assertTrue(read.isPresent());
+ assertEquals(ClusterRoleRecord.RegistryType.ZK,
read.get().getRegistryType());
+ assertEquals(written, read.get());
+ }
+
+ // Tests for isLogicallyEqualIgnoringVersionAndRegistry
+
+ @Test
+ public void testLogicalEquality_nullOther() {
+ ClusterRoleRecord r =
+ new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER,
ClusterRoleRecord.RegistryType.ZK,
+ "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase",
ClusterRole.STANDBY, 1L);
+ assertFalse(r.isLogicallyEqualIgnoringVersionAndRegistry(null));
+ }
+
+ @Test
+ public void testLogicalEquality_sameFieldsDifferentVersion() {
+ ClusterRoleRecord a =
+ new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER,
ClusterRoleRecord.RegistryType.ZK,
+ "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase",
ClusterRole.STANDBY, 1L);
+ ClusterRoleRecord b =
+ new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER,
ClusterRoleRecord.RegistryType.ZK,
+ "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase",
ClusterRole.STANDBY, 42L);
+ assertTrue(a.isLogicallyEqualIgnoringVersionAndRegistry(b));
+ assertTrue(b.isLogicallyEqualIgnoringVersionAndRegistry(a));
+ }
+
+ @Test
+ public void testLogicalEquality_zkVsRpcWithDifferentUrlForms() {
+ // Same logical roles but different URL forms (different registryType
normalization)
+ // are NOT equal because the normalized url1/url2 differ.
+ ClusterRoleRecord zkForm =
+ new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER,
ClusterRoleRecord.RegistryType.ZK,
+ "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase",
ClusterRole.STANDBY, 1L);
+ ClusterRoleRecord rpcForm = new ClusterRoleRecord("g",
HighAvailabilityPolicy.FAILOVER,
+ ClusterRoleRecord.RegistryType.RPC, "master1\\:16000",
ClusterRole.ACTIVE, "master2\\:16000",
+ ClusterRole.STANDBY, 1L);
+ assertFalse(zkForm.isLogicallyEqualIgnoringVersionAndRegistry(rpcForm));
+ }
+
+ @Test
+ public void testLogicalEquality_differentRole() {
+ ClusterRoleRecord a =
+ new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER,
ClusterRoleRecord.RegistryType.ZK,
+ "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase",
ClusterRole.STANDBY, 1L);
+ ClusterRoleRecord b =
+ new ClusterRoleRecord("g", HighAvailabilityPolicy.FAILOVER,
ClusterRoleRecord.RegistryType.ZK,
+ "zk1\\:2181::/hbase", ClusterRole.ACTIVE, "zk2\\:2181::/hbase",
ClusterRole.OFFLINE, 1L);
+ assertFalse(a.isLogicallyEqualIgnoringVersionAndRegistry(b));
+ }
+
+ @Test
+ public void testLogicalEquality_differentHaGroupName() {
+ ClusterRoleRecord a = new ClusterRoleRecord("g1",
HighAvailabilityPolicy.FAILOVER,
+ ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase",
ClusterRole.ACTIVE,
+ "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L);
+ ClusterRoleRecord b = new ClusterRoleRecord("g2",
HighAvailabilityPolicy.FAILOVER,
+ ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase",
ClusterRole.ACTIVE,
+ "zk2\\:2181::/hbase", ClusterRole.STANDBY, 1L);
+ assertFalse(a.isLogicallyEqualIgnoringVersionAndRegistry(b));
+ }
+
// Private Helper Methods
private ClusterRoleRecord getClusterRoleRecord(String name,
HighAvailabilityPolicy policy,
diff --git
a/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json
b/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json
new file mode 100644
index 0000000000..e2b7269b48
--- /dev/null
+++ b/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json
@@ -0,0 +1,10 @@
+{
+ "haGroupName" : "testFromJsonExplicitRpcRoundTrips",
+ "policy" : "FAILOVER",
+ "registryType" : "RPC",
+ "url1" : "url1\\:2181",
+ "role1" : "ACTIVE",
+ "url2" : "url2\\:2181",
+ "role2" : "STANDBY",
+ "version" : 11
+}
diff --git
a/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json
b/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json
new file mode 100644
index 0000000000..1e9ce174ef
--- /dev/null
+++ b/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json
@@ -0,0 +1,10 @@
+{
+ "haGroupName" : "testFromJsonExplicitZkRoundTrips",
+ "policy" : "FAILOVER",
+ "registryType" : "ZK",
+ "url1" : "zk1\\:2181::/hbase",
+ "role1" : "ACTIVE",
+ "url2" : "zk2\\:2181::/hbase",
+ "role2" : "STANDBY",
+ "version" : 13
+}
diff --git
a/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json
b/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json
new file mode 100644
index 0000000000..517e061b65
--- /dev/null
+++
b/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json
@@ -0,0 +1,9 @@
+{
+ "haGroupName" : "testFromJsonWithoutRegistryTypeDefaultsToRpc",
+ "policy" : "FAILOVER",
+ "url1" : "url1\\:2181",
+ "role1" : "ACTIVE",
+ "url2" : "url2\\:2181",
+ "role2" : "STANDBY",
+ "version" : 7
+}