This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new a0fb7d368c PHOENIX-7566 HAGroupStore admin tool: HDFS URLs, URL
validation, failover recovery guidance (#2519)
a0fb7d368c is described below
commit a0fb7d368ca1e40412a1cfac56dc0c426762580d
Author: ritegarg <[email protected]>
AuthorDate: Thu Jun 18 10:41:27 2026 -0700
PHOENIX-7566 HAGroupStore admin tool: HDFS URLs, URL validation, failover
recovery guidance (#2519)
* PHOENIX-7566 HAGroupStore admin tool: HDFS URLs, URL validation,
canonical SYSTEM.HA_GROUP writes, failover recovery guidance
- get/list print HDFS URL / Peer HDFS URL; update accepts
--hdfs-url/--peer-hdfs-url
(register options, count them in the field guard, show them in proposed
changes).
- Validate URL fields on create/update against the registry type the read
path uses
(ZK quorum vs RPC/master), with a --force bypass;
HAGroupStoreClient.getHAGroupNames
skips + WARNs a row whose ZK URL will not parse instead of breaking
enumeration for all
callers; render an unparseable stored cluster URL as <invalid> in
get-cluster-role-record
instead of crashing.
- create/update now write the SYSTEM.HA_GROUP slot columns (including
HDFS_URL_1/2) in a
canonical order keyed on the formatted ZK URL, so each slot's
ZK/CLUSTER/ROLE/HDFS columns
stay paired and both clusters persist identical rows (matching the
periodic
ZK->SYSTEM.HA_GROUP sync). update previously wrote local-first and never
wrote HDFS, which
could leave ZK_URL_n unpaired from HDFS_URL_n.
- On initiate-failover/abort-failover timeout, print manual-recovery
guidance (inspect both
sides, restore connectivity, abort on standby, or force a steady state).
Co-authored-by: Cursor <[email protected]>
---
.../apache/phoenix/jdbc/HAGroupStoreClient.java | 33 +--
.../apache/phoenix/jdbc/PhoenixHAAdminTool.java | 221 ++++++++++++++++++---
.../apache/phoenix/jdbc/PhoenixHAAdminToolIT.java | 201 ++++++++++++++++++-
3 files changed, 408 insertions(+), 47 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index 6543f291f3..edbc530375 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -216,23 +216,28 @@ public class HAGroupStoreClient implements Closeable {
PhoenixConnection conn = (PhoenixConnection) DriverManager
.getConnection(JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
Statement stmt = conn.createStatement(); ResultSet rs =
stmt.executeQuery(queryString)) {
+ // Format the local zkUrl once and strictly: a malformed own URL is a
hard error.
+ String formattedZkUrl = JDBCUtil.formatUrl(zkUrl, RegistryType.ZK);
while (rs.next()) {
+ String haGroupName = rs.getString(HA_GROUP_NAME);
String zkUrl1 = rs.getString(ZK_URL_1);
String zkUrl2 = rs.getString(ZK_URL_2);
- String formattedZkUrl1 = null;
- String formattedZkUrl2 = null;
- if (StringUtils.isNotBlank(zkUrl1)) {
- formattedZkUrl1 = JDBCUtil.formatUrl(zkUrl1, RegistryType.ZK);
- }
- if (StringUtils.isNotBlank(zkUrl2)) {
- formattedZkUrl2 = JDBCUtil.formatUrl(zkUrl2, RegistryType.ZK);
- }
- String formattedZkUrl = JDBCUtil.formatUrl(zkUrl, RegistryType.ZK);
- if (
- StringUtils.equals(formattedZkUrl1, formattedZkUrl)
- || StringUtils.equals(formattedZkUrl2, formattedZkUrl)
- ) {
- result.add(rs.getString(HA_GROUP_NAME));
+ try {
+ String formattedZkUrl1 =
+ StringUtils.isNotBlank(zkUrl1) ? JDBCUtil.formatUrl(zkUrl1,
RegistryType.ZK) : null;
+ String formattedZkUrl2 =
+ StringUtils.isNotBlank(zkUrl2) ? JDBCUtil.formatUrl(zkUrl2,
RegistryType.ZK) : null;
+ if (
+ StringUtils.equals(formattedZkUrl1, formattedZkUrl)
+ || StringUtils.equals(formattedZkUrl2, formattedZkUrl)
+ ) {
+ result.add(haGroupName);
+ }
+ } catch (RuntimeException e) {
+ // One row with an unparseable ZK URL must not break enumeration for
any caller (admin
+ // `list`, PhoenixRegionServerEndpoint,
ReplicationLogReplayService). Skip just that row.
+ LOGGER.warn("Skipping HA group row '{}' with unparseable ZK URL
(zkUrl1={}, zkUrl2={})",
+ haGroupName, zkUrl1, zkUrl2, e);
}
}
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
index de3fb68377..7fbd9d5db5 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
@@ -57,6 +57,7 @@ import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
import org.apache.phoenix.jdbc.PhoenixHAAdmin.HighAvailibilityCuratorProvider;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.util.JDBCUtil;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -323,10 +324,17 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
if (
policy == null && state == null && clusterUrl == null &&
peerClusterUrl == null
&& peerZkUrl == null && protocolVersion == null && lastSyncTimeStr
== null
+ && hdfsUrl == null && peerHdfsUrl == null
) {
throw new IllegalArgumentException("Must specify at least one field to
update");
}
+ // Validate URLs against the registry type the read path normalizes them
with (ZK quorum vs
+ // RPC/master). ZK URLs identify the HA pair and must remain parseable
even with --force.
+ validateUrlsOrThrow(false, ClusterRoleRecord.RegistryType.ZK,
"peer-zk-url", peerZkUrl);
+ validateUrlsOrThrow(force, ClusterRoleRecord.RegistryType.RPC,
"cluster-url", clusterUrl,
+ "peer-cluster-url", peerClusterUrl);
+
// Determine version
long adminVersion;
if (autoIncrement) {
@@ -500,6 +508,63 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
System.out.println();
}
+ /**
+ * Validate that each provided URL parses under {@link
JDBCUtil#formatUrl(String, RegistryType)}
+ * for the given registry type - the same normalization the HA read path
applies (ZK URLs go
+ * through {@code RegistryType.ZK}, cluster URLs through {@code
RegistryType.RPC}). A value that
+ * throws here would later break HA group enumeration and
cluster-role-record construction.
+ * Passing --force stores non-ZK URL values as-is (logged), mirroring the
state/lastSyncTime
+ * force-guard. ZK URLs must be validated with {@code force=false} because
HAGroupStoreClient uses
+ * them as identity keys and cannot safely operate on malformed values.
+ * @param force when true, malformed non-ZK URLs are logged and
allowed through
+ * @param registryType registry type whose normalization the URLs must
satisfy
+ * @param labeledUrls flat [label0, url0, label1, url1, ...] pairs; blank
URLs are skipped
+ * @throws IllegalArgumentException if any URL is malformed and force is
false
+ */
+ private static void validateUrlsOrThrow(boolean force,
+ ClusterRoleRecord.RegistryType registryType, String... labeledUrls) {
+ StringBuilder invalid = new StringBuilder();
+ for (int i = 0; i + 1 < labeledUrls.length; i += 2) {
+ String url = labeledUrls[i + 1];
+ if (StringUtils.isBlank(url)) {
+ continue;
+ }
+ try {
+ JDBCUtil.formatUrl(url, registryType);
+ } catch (RuntimeException e) {
+ if (invalid.length() > 0) {
+ invalid.append("; ");
+ }
+ invalid.append(labeledUrls[i]).append("='").append(url).append("'");
+ }
+ }
+ if (invalid.length() == 0) {
+ return;
+ }
+ if (force) {
+ LOG.warn("Storing malformed URL(s) due to --force: {}", invalid);
+ } else {
+ throw new IllegalArgumentException(
+ "Malformed URL(s): " + invalid + ". Fix the value(s), or pass --force
to store as-is.");
+ }
+ }
+
+ /**
+ * Render a stored URL for display: the raw value, annotated with {@code
<invalid>} when it cannot
+ * be normalized by {@link JDBCUtil#formatUrl(String, RegistryType)} for the
given registry type.
+ */
+ private static String describeUrl(String url, ClusterRoleRecord.RegistryType
registryType) {
+ if (StringUtils.isBlank(url)) {
+ return "null";
+ }
+ try {
+ JDBCUtil.formatUrl(url, registryType);
+ return url;
+ } catch (RuntimeException e) {
+ return url + " <invalid>";
+ }
+ }
+
/**
* Initiates failover on active cluster, transitioning it to standby while
peer becomes active.
* <p>
@@ -626,7 +691,7 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
System.err.println("\n⚠ Failover transition incomplete");
System.err.println(" The failover was initiated but did not
complete within "
+ timeoutSeconds + " seconds.");
- System.err.println(" Check cluster states manually to verify
completion.");
+ printFailoverRecoveryGuidance(haGroupName);
return RET_UPDATE_ERROR;
}
}
@@ -759,7 +824,7 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
System.err.println("\n⚠ Abort transition incomplete");
System.err.println(" The abort was initiated but did not complete
within "
+ timeoutSeconds + " seconds.");
- System.err.println(" Check cluster states manually to verify
completion.");
+ printFailoverRecoveryGuidance(haGroupName);
return RET_UPDATE_ERROR;
}
}
@@ -807,9 +872,24 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
String haGroupName = getRequiredOption(cmdLine, HA_GROUP_OPT, "HA group
name");
HAGroupStoreManager manager = HAGroupStoreManager.getInstance(getConf());
- ClusterRoleRecord clusterRoleRecord =
manager.getClusterRoleRecord(haGroupName);
-
- printClusterRoleRecordAsText(clusterRoleRecord);
+ try {
+
printClusterRoleRecordAsText(manager.getClusterRoleRecord(haGroupName));
+ } catch (RuntimeException e) {
+ // A malformed stored cluster URL makes ClusterRoleRecord
normalization throw; report it on
+ // stderr (bad URL marked <invalid>, with a repair hint) and an error
code, don't crash.
+ HAGroupStoreRecord raw =
manager.getHAGroupStoreRecord(haGroupName).orElse(null);
+ System.err.println("\nCluster Role Record for '" + haGroupName
+ + "' could not be built (likely a malformed stored URL): " +
e.getMessage());
+ if (raw != null) {
+ System.err.println(" Cluster URL: "
+ + describeUrl(raw.getClusterUrl(),
ClusterRoleRecord.RegistryType.RPC));
+ System.err.println(" Peer Cluster URL: "
+ + describeUrl(raw.getPeerClusterUrl(),
ClusterRoleRecord.RegistryType.RPC));
+ }
+ System.err.println(
+ " Repair with: update -g " + haGroupName + " -c <good-url> (or -pc)
-av [--force]");
+ return RET_UPDATE_ERROR;
+ }
return RET_SUCCESS;
@@ -849,6 +929,14 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
String hdfsUrl1 = getRequiredOption(cmdLine, HDFS_URL_1_OPT, "HDFS URL
for cluster 1");
String hdfsUrl2 = getRequiredOption(cmdLine, HDFS_URL_2_OPT, "HDFS URL
for cluster 2");
final boolean dryRun = cmdLine.hasOption(DRY_RUN_OPT.getOpt());
+ final boolean force = cmdLine.hasOption(FORCE_OPT.getOpt());
+
+ // Validate URLs against the registry type the read path normalizes them
with (ZK quorum vs
+ // RPC/master). ZK URLs identify the HA pair and must remain parseable
even with --force.
+ validateUrlsOrThrow(false, ClusterRoleRecord.RegistryType.ZK,
"zk-url-1", zkUrl1, "zk-url-2",
+ zkUrl2);
+ validateUrlsOrThrow(force, ClusterRoleRecord.RegistryType.RPC,
"cluster-url-1", clusterUrl1,
+ "cluster-url-2", clusterUrl2);
long adminVersion = 1L;
String adminVersionStr =
cmdLine.getOptionValue(ADMIN_VERSION_OPT.getOpt());
@@ -933,6 +1021,26 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
return false;
}
+ /**
+ * Whether cluster A takes slot 1 in SYSTEM.HA_GROUP's slot-indexed columns,
keyed on the
+ * formatted ZK URL compared lexicographically. The ordering is
deterministic and independent of
+ * which cluster is local (the same canonicalization {@link
ClusterRoleRecord} applies to its
+ * url1/url2, though that sorts on the cluster URL) and matches the periodic
+ * ZK->SYSTEM.HA_GROUP sync - so both clusters of a pair persist
identical rows and every slot
+ * keeps its ZK/CLUSTER/ROLE/HDFS columns pointing at one cluster (the read
path resolves a
+ * cluster's HDFS URL by matching its ZK URL slot).
+ */
+ private static boolean firstClusterTakesSlot1(String zkUrlA, String zkUrlB) {
+ String formattedZkUrlA = StringUtils.isBlank(zkUrlA)
+ ? null
+ : JDBCUtil.formatUrl(zkUrlA, ClusterRoleRecord.RegistryType.ZK);
+ String formattedZkUrlB = StringUtils.isBlank(zkUrlB)
+ ? null
+ : JDBCUtil.formatUrl(zkUrlB, ClusterRoleRecord.RegistryType.ZK);
+ return StringUtils.isBlank(formattedZkUrlB) ||
(StringUtils.isNotBlank(formattedZkUrlA)
+ && formattedZkUrlA.compareTo(formattedZkUrlB) <= 0);
+ }
+
/**
* Insert a new HA group row into SYSTEM.HA_GROUP using symmetric slot-based
columns.
*/
@@ -941,6 +1049,17 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
String clusterUrl2, ClusterRole clusterRole2, String hdfsUrl2, long
adminVersion,
String localZkUrl) throws SQLException {
+ // Write the slot columns in canonical order so ZK/CLUSTER/ROLE/HDFS stay
paired per slot.
+ boolean firstInSlot1 = firstClusterTakesSlot1(zkUrl1, zkUrl2);
+ String slot1ZkUrl = firstInSlot1 ? zkUrl1 : zkUrl2;
+ String slot1ClusterUrl = firstInSlot1 ? clusterUrl1 : clusterUrl2;
+ ClusterRole slot1ClusterRole = firstInSlot1 ? clusterRole1 : clusterRole2;
+ String slot1HdfsUrl = firstInSlot1 ? hdfsUrl1 : hdfsUrl2;
+ String slot2ZkUrl = firstInSlot1 ? zkUrl2 : zkUrl1;
+ String slot2ClusterUrl = firstInSlot1 ? clusterUrl2 : clusterUrl1;
+ ClusterRole slot2ClusterRole = firstInSlot1 ? clusterRole2 : clusterRole1;
+ String slot2HdfsUrl = firstInSlot1 ? hdfsUrl2 : hdfsUrl1;
+
String insertQuery = "UPSERT INTO " + SYSTEM_HA_GROUP_NAME + " (" +
HA_GROUP_NAME + ", "
+ POLICY + ", " + ZK_URL_1 + ", " + CLUSTER_URL_1 + ", " +
CLUSTER_ROLE_1 + ", " + HDFS_URL_1
+ ", " + ZK_URL_2 + ", " + CLUSTER_URL_2 + ", " + CLUSTER_ROLE_2 + ", "
+ HDFS_URL_2 + ", "
@@ -950,14 +1069,14 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
PreparedStatement pstmt = conn.prepareStatement(insertQuery)) {
pstmt.setString(1, haGroupName);
pstmt.setString(2, policy);
- pstmt.setString(3, zkUrl1);
- pstmt.setString(4, clusterUrl1);
- pstmt.setString(5, clusterRole1.name());
- pstmt.setString(6, hdfsUrl1);
- pstmt.setString(7, zkUrl2);
- pstmt.setString(8, clusterUrl2);
- pstmt.setString(9, clusterRole2.name());
- pstmt.setString(10, hdfsUrl2);
+ pstmt.setString(3, slot1ZkUrl);
+ pstmt.setString(4, slot1ClusterUrl);
+ pstmt.setString(5, slot1ClusterRole.name());
+ pstmt.setString(6, slot1HdfsUrl);
+ pstmt.setString(7, slot2ZkUrl);
+ pstmt.setString(8, slot2ClusterUrl);
+ pstmt.setString(9, slot2ClusterRole.name());
+ pstmt.setString(10, slot2HdfsUrl);
pstmt.setLong(11, adminVersion);
pstmt.executeUpdate();
conn.commit();
@@ -1094,6 +1213,24 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
}
}
+ /**
+ * Print operator guidance shown when a failover / abort does not converge
within the timeout.
+ * Post-partition states (e.g. active ACTIVE_IN_SYNC_TO_STANDBY with peer
STANDBY) are ambiguous
+ * and are intentionally not auto-converged, so recovery is manual.
+ */
+ private void printFailoverRecoveryGuidance(String haGroupName) {
+ System.err.println("\n Recovery (state did not converge - can happen on a
partition):");
+ System.err.println(" 1. Inspect both clusters (run on each):");
+ System.err.println(" get-cluster-role-record -g " + haGroupName);
+ System.err.println(" 2. If the clusters were partitioned, restore
connectivity and re-check."
+ + " Transient states are not auto-converged once a notification is
lost.");
+ System.err.println(" 3. If a cluster is stuck in a transitional *_TO_*
state, abort the"
+ + " failover on the standby; if it still will not converge, force a
steady state (last"
+ + " resort):");
+ System.err.println(" abort-failover -g " + haGroupName);
+ System.err.println(" update -g " + haGroupName + " -s
<ACTIVE_IN_SYNC|STANDBY> -av -F");
+ }
+
/**
* Perform the update operation
*/
@@ -1285,10 +1422,22 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
LOG.warn("Could not read peer record, using UNKNOWN for peer role", e);
}
+ // Write all slot columns (incl HDFS) in canonical order so
ZK/CLUSTER/ROLE/HDFS stay paired per
+ // slot. Previously this wrote local-first and never wrote HDFS, which
mispaired the row.
+ boolean localInSlot1 = firstClusterTakesSlot1(localZkUrl,
record.getPeerZKUrl());
+ ClusterRole slot1ClusterRole = localInSlot1 ? record.getClusterRole() :
peerRole;
+ ClusterRole slot2ClusterRole = localInSlot1 ? peerRole :
record.getClusterRole();
+ String slot1ClusterUrl = localInSlot1 ? record.getClusterUrl() :
record.getPeerClusterUrl();
+ String slot2ClusterUrl = localInSlot1 ? record.getPeerClusterUrl() :
record.getClusterUrl();
+ String slot1ZkUrl = localInSlot1 ? localZkUrl : record.getPeerZKUrl();
+ String slot2ZkUrl = localInSlot1 ? record.getPeerZKUrl() : localZkUrl;
+ String slot1HdfsUrl = localInSlot1 ? record.getHdfsUrl() :
record.getPeerHdfsUrl();
+ String slot2HdfsUrl = localInSlot1 ? record.getPeerHdfsUrl() :
record.getHdfsUrl();
+
String updateQuery = "UPSERT INTO " + SYSTEM_HA_GROUP_NAME + " " + "(" +
HA_GROUP_NAME + ", "
+ POLICY + ", " + CLUSTER_ROLE_1 + ", " + CLUSTER_ROLE_2 + ", " +
CLUSTER_URL_1 + ", "
- + CLUSTER_URL_2 + ", " + ZK_URL_1 + ", " + ZK_URL_2 + ", " + VERSION +
") "
- + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ + CLUSTER_URL_2 + ", " + ZK_URL_1 + ", " + ZK_URL_2 + ", " + HDFS_URL_1
+ ", " + HDFS_URL_2
+ + ", " + VERSION + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
try (
PhoenixConnection conn = (PhoenixConnection) DriverManager
@@ -1297,13 +1446,15 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
pstmt.setString(1, haGroupName);
pstmt.setString(2, record.getPolicy());
- pstmt.setString(3, record.getClusterRole().name());
- pstmt.setString(4, peerRole.name());
- pstmt.setString(5, record.getClusterUrl());
- pstmt.setString(6, record.getPeerClusterUrl());
- pstmt.setString(7, localZkUrl);
- pstmt.setString(8, record.getPeerZKUrl());
- pstmt.setLong(9, record.getAdminCRRVersion());
+ pstmt.setString(3, slot1ClusterRole.name());
+ pstmt.setString(4, slot2ClusterRole.name());
+ pstmt.setString(5, slot1ClusterUrl);
+ pstmt.setString(6, slot2ClusterUrl);
+ pstmt.setString(7, slot1ZkUrl);
+ pstmt.setString(8, slot2ZkUrl);
+ pstmt.setString(9, slot1HdfsUrl);
+ pstmt.setString(10, slot2HdfsUrl);
+ pstmt.setLong(11, record.getAdminCRRVersion());
pstmt.executeUpdate();
conn.commit();
@@ -1345,6 +1496,16 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
hasChanges = true;
}
+ if (!Objects.equals(current.getHdfsUrl(), proposed.getHdfsUrl())) {
+ printFieldChange("HDFS URL", current.getHdfsUrl(),
proposed.getHdfsUrl());
+ hasChanges = true;
+ }
+
+ if (!Objects.equals(current.getPeerHdfsUrl(), proposed.getPeerHdfsUrl())) {
+ printFieldChange("Peer HDFS URL", current.getPeerHdfsUrl(),
proposed.getPeerHdfsUrl());
+ hasChanges = true;
+ }
+
if (!Objects.equals(current.getProtocolVersion(),
proposed.getProtocolVersion())) {
printFieldChange("Protocol Version", current.getProtocolVersion(),
proposed.getProtocolVersion());
@@ -1415,6 +1576,8 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
System.out.println(" Cluster URL: " + record.getClusterUrl());
System.out.println(" Peer Cluster URL: " + record.getPeerClusterUrl());
System.out.println(" Peer ZK URL: " + record.getPeerZKUrl());
+ System.out.println(" HDFS URL: " + record.getHdfsUrl());
+ System.out.println(" Peer HDFS URL: " + record.getPeerHdfsUrl());
System.out.println(" Admin Version: " +
record.getAdminCRRVersion());
System.out
.println(" Last Sync Time: " +
formatTimestamp(record.getLastSyncStateTimeInMs()));
@@ -1430,7 +1593,7 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
.addOption(ZK_URL_1_OPT).addOption(CLUSTER_URL_1_OPT).addOption(CLUSTER_ROLE_1_OPT)
.addOption(ZK_URL_2_OPT).addOption(CLUSTER_URL_2_OPT).addOption(CLUSTER_ROLE_2_OPT)
.addOption(HDFS_URL_1_OPT).addOption(HDFS_URL_2_OPT).addOption(ADMIN_VERSION_OPT)
- .addOption(DRY_RUN_OPT);
+ .addOption(FORCE_OPT).addOption(DRY_RUN_OPT);
}
/**
@@ -1440,8 +1603,8 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
return new
Options().addOption(HELP_OPT).addOption(HA_GROUP_OPT).addOption(ADMIN_VERSION_OPT)
.addOption(AUTO_INCREMENT_VERSION_OPT).addOption(POLICY_OPT).addOption(STATE_OPT)
.addOption(CLUSTER_URL_OPT).addOption(PEER_CLUSTER_URL_OPT).addOption(PEER_ZK_URL_OPT)
-
.addOption(PROTOCOL_VERSION_OPT).addOption(LAST_SYNC_TIME_OPT).addOption(FORCE_OPT)
- .addOption(DRY_RUN_OPT);
+
.addOption(PROTOCOL_VERSION_OPT).addOption(LAST_SYNC_TIME_OPT).addOption(HDFS_URL_OPT)
+
.addOption(PEER_HDFS_URL_OPT).addOption(FORCE_OPT).addOption(DRY_RUN_OPT);
}
/**
@@ -1586,6 +1749,8 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
System.out.println();
System.out.println("OPTIONAL:");
System.out.println(" -v, --admin-version <ver> Initial admin
version (default: 1)");
+ System.out.println(
+ " -F, --force Store malformed cluster URLs as-is;
ZK URLs still checked");
System.out.println(" -d, --dry-run Show what would be
created");
System.out.println(" -h, --help Show this help");
System.out.println();
@@ -1624,11 +1789,15 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
System.out.println(" -c, --cluster-url <url> Local cluster
URL");
System.out.println(" -pc, --peer-cluster-url <url> Peer cluster URL");
System.out.println(" -pz, --peer-zk-url <url> Peer ZK URL");
+ System.out.println(" -hdfsurl, --hdfs-url <url> HDFS URL");
+ System.out.println(" -phdfsurl, --peer-hdfs-url <url> Peer HDFS URL");
System.out.println(" -pv, --protocol-version <ver> Protocol version");
System.out.println(" -lst, --last-sync-time <ms> Last sync time
(requires --force)");
System.out.println();
System.out.println("FLAGS:");
- System.out.println(" -F, --force Allow state and
restricted changes");
+ System.out.println(
+ " -F, --force Allow restricted changes; store
malformed cluster URLs as-is");
+ System.out.println(" ZK URLs are always
validated");
System.out.println(" -d, --dry-run Show changes
without applying");
System.out.println(" -h, --help Show this help");
System.out.println();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolIT.java
index ca60771c80..c6d110f55e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolIT.java
@@ -24,6 +24,8 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_2;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_1;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_2;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.HA_GROUP_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.HDFS_URL_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.HDFS_URL_2;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.POLICY;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VERSION;
@@ -73,8 +75,13 @@ import org.slf4j.LoggerFactory;
public class PhoenixHAAdminToolIT extends HABaseIT {
private static final Logger LOG =
LoggerFactory.getLogger(PhoenixHAAdminToolIT.class);
private static final PrintStream STDOUT = System.out;
+ private static final PrintStream STDERR = System.err;
private static final ByteArrayOutputStream STDOUT_CAPTURE = new
ByteArrayOutputStream();
+ // A ZK URL with a negative port: JDBCUtil.formatUrl rejects it as a parse
failure (not merely
+ // unreachable), which is what exercises the URL-validation and tolerance
paths.
+ private static final String MALFORMED_URL = "nohost:-1";
+
private String haGroupName;
private PhoenixHAAdminTool adminTool;
@@ -189,8 +196,9 @@ public class PhoenixHAAdminToolIT extends HABaseIT {
@After
public void after() throws Exception {
- // reset STDOUT in case it was captured for testing
+ // reset STDOUT/STDERR in case they were captured for testing
System.setOut(STDOUT);
+ System.setErr(STDERR);
// Clean up all HA group records from both clusters after each test
try {
@@ -308,8 +316,8 @@ public class PhoenixHAAdminToolIT extends HABaseIT {
if (rs.next()) {
return new SystemTableRecord(rs.getString(HA_GROUP_NAME),
rs.getString(ZK_URL_1),
rs.getString(ZK_URL_2), rs.getString(CLUSTER_URL_1),
rs.getString(CLUSTER_URL_2),
- rs.getString(CLUSTER_ROLE_1), rs.getString(CLUSTER_ROLE_2),
rs.getString(POLICY),
- rs.getLong(VERSION));
+ rs.getString(CLUSTER_ROLE_1), rs.getString(CLUSTER_ROLE_2),
rs.getString(HDFS_URL_1),
+ rs.getString(HDFS_URL_2), rs.getString(POLICY), rs.getLong(VERSION));
}
}
return null;
@@ -323,17 +331,22 @@ public class PhoenixHAAdminToolIT extends HABaseIT {
final String zkUrl2;
final String clusterUrl1;
final String clusterUrl2;
+ final String hdfsUrl1;
+ final String hdfsUrl2;
final String policy;
final long version;
SystemTableRecord(String haGroupName, String zkUrl1, String zkUrl2, String
clusterUrl1,
- String clusterUrl2, String clusterRole1, String clusterRole2, String
policy, long version) {
+ String clusterUrl2, String clusterRole1, String clusterRole2, String
hdfsUrl1,
+ String hdfsUrl2, String policy, long version) {
// Note: haGroupName, clusterRole1, and clusterRole2 are intentionally
not stored
// as they are not currently needed for verification
this.zkUrl1 = zkUrl1;
this.zkUrl2 = zkUrl2;
this.clusterUrl1 = clusterUrl1;
this.clusterUrl2 = clusterUrl2;
+ this.hdfsUrl1 = hdfsUrl1;
+ this.hdfsUrl2 = hdfsUrl2;
this.policy = policy;
this.version = version;
}
@@ -722,7 +735,10 @@ public class PhoenixHAAdminToolIT extends HABaseIT {
LOG.info("NOT setting up listener - simulating stuck failover where reader
doesn't react");
// === EXECUTE INITIATE-FAILOVER COMMAND WITH SHORT TIMEOUT ===
- System.setOut(new PrintStream(STDOUT_CAPTURE));
+ // Capture both stdout and stderr: the recovery guidance is written to
stderr.
+ PrintStream capture = new PrintStream(STDOUT_CAPTURE);
+ System.setOut(capture);
+ System.setErr(capture);
PhoenixHAAdminTool cluster1AdminTool = new PhoenixHAAdminTool();
cluster1AdminTool.setConf(CLUSTERS.getHBaseCluster1().getConfiguration());
@@ -744,6 +760,12 @@ public class PhoenixHAAdminToolIT extends HABaseIT {
assertTrue("Output should indicate transition incomplete",
output.contains("Failover transition incomplete") ||
output.contains("Timeout"));
+ // On timeout, operators should be pointed at the manual recovery steps.
+ assertTrue("Timeout output should include recovery guidance",
+ output.contains("Recovery") &&
output.contains("get-cluster-role-record"));
+ assertTrue("Timeout output should suggest abort-failover on the standby",
+ output.contains("abort-failover"));
+
// === VERIFY INTERMEDIATE STATE ===
// Cluster1 should be stuck in ACTIVE_IN_SYNC_TO_STANDBY (not reached
STANDBY)
waitForHAGroupState(cluster1HAManager, timeoutFailoverHaGroupName,
@@ -807,8 +829,9 @@ public class PhoenixHAAdminToolIT extends HABaseIT {
// Update peer ZK URL, cluster URL, and peer cluster URL using
auto-increment version
String newPeerZkUrl = "localhost:9999:/test";
- String newClusterUrl = "localhost:16020,localhost:16021,localhost:16022";
- String newPeerClusterUrl =
"localhost:16030,localhost:16031,localhost:16032";
+ // Multi-master URL form is comma-separated hosts with a single shared
port.
+ String newClusterUrl = "host1,host2,host3:16020";
+ String newPeerClusterUrl = "host4,host5,host6:16030";
int ret = ToolRunner.run(adminTool, new String[] { "update", "-g",
updateHaGroupName, "-pz",
newPeerZkUrl, "-c", newClusterUrl, "-pc", newPeerClusterUrl, "-av" });
@@ -877,6 +900,10 @@ public class PhoenixHAAdminToolIT extends HABaseIT {
systemRecord.clusterUrl1);
assertEquals("System table CLUSTER_URL_2 should be updated (peer
cluster)", newPeerClusterUrl,
systemRecord.clusterUrl2);
+ assertEquals("System table HDFS_URL_1 should pair with local cluster
slot",
+ CLUSTERS.getHdfsUrl1(), systemRecord.hdfsUrl1);
+ assertEquals("System table HDFS_URL_2 should pair with peer cluster
slot",
+ CLUSTERS.getHdfsUrl2(), systemRecord.hdfsUrl2);
} else {
// Local cluster is in position 2, peer is in position 1
assertEquals("System table ZK_URL_2 should remain unchanged (local
cluster)",
@@ -887,6 +914,10 @@ public class PhoenixHAAdminToolIT extends HABaseIT {
systemRecord.clusterUrl2);
assertEquals("System table CLUSTER_URL_1 should be updated (peer
cluster)", newPeerClusterUrl,
systemRecord.clusterUrl1);
+ assertEquals("System table HDFS_URL_2 should pair with local cluster
slot",
+ CLUSTERS.getHdfsUrl1(), systemRecord.hdfsUrl2);
+ assertEquals("System table HDFS_URL_1 should pair with peer cluster
slot",
+ CLUSTERS.getHdfsUrl2(), systemRecord.hdfsUrl1);
}
// Verify policy and roles remain unchanged in system table
@@ -1040,4 +1071,160 @@ public class PhoenixHAAdminToolIT extends HABaseIT {
assertEquals("create command should return RET_ARGUMENT_ERROR when
--policy is missing",
RET_ARGUMENT_ERROR, ret);
}
+
+ /**
+ * get output should include the HDFS URL and Peer HDFS URL of the record.
+ */
+ @Test(timeout = 180000)
+ public void testGetCommandShowsHdfsUrls() throws Exception {
+ System.setOut(new PrintStream(STDOUT_CAPTURE));
+
+ int ret = ToolRunner.run(adminTool, new String[] { "get", "-g",
"prod_cluster_alpha" });
+ assertEquals(RET_SUCCESS, ret);
+
+ String output = STDOUT_CAPTURE.toString();
+ LOG.info("Got stdout from get command: \n++++++++\n{}++++++++\n", output);
+ assertTrue("Output should label HDFS URL", output.contains("HDFS URL:"));
+ assertTrue("Output should label Peer HDFS URL", output.contains("Peer HDFS
URL:"));
+ assertTrue("Output should show the local HDFS URL value",
+ output.contains(CLUSTERS.getHdfsUrl1()));
+ assertTrue("Output should show the peer HDFS URL value",
+ output.contains(CLUSTERS.getHdfsUrl2()));
+ }
+
+ /**
+ * update accepts HDFS URL fields on their own and persists them to ZK.
+ */
+ @Test(timeout = 180000)
+ public void testUpdateCommandHdfsUrlsOnly() throws Exception {
+ HAGroupStoreManager manager =
+ new HAGroupStoreManager(CLUSTERS.getHBaseCluster1().getConfiguration());
+ Optional<HAGroupStoreRecord> initialOpt =
manager.getHAGroupStoreRecord(haGroupName);
+ assertTrue("Initial record should exist", initialOpt.isPresent());
+ long initialVersion = initialOpt.get().getAdminCRRVersion();
+
+ System.setOut(new PrintStream(STDOUT_CAPTURE));
+ String newHdfsUrl = "hdfs://newlocal:8020";
+ String newPeerHdfsUrl = "hdfs://newpeer:8020";
+ int ret = ToolRunner.run(adminTool, new String[] { "update", "-g",
haGroupName, "-hdfsurl",
+ newHdfsUrl, "-phdfsurl", newPeerHdfsUrl, "-av" });
+ assertEquals("HDFS-only update should succeed", RET_SUCCESS, ret);
+
+ Thread.sleep(2000);
+ Optional<HAGroupStoreRecord> updatedOpt =
manager.getHAGroupStoreRecord(haGroupName);
+ assertTrue("Record should exist after update", updatedOpt.isPresent());
+ HAGroupStoreRecord updated = updatedOpt.get();
+ assertEquals("Version should increment", initialVersion + 1,
updated.getAdminCRRVersion());
+ assertEquals("HDFS URL should be updated", newHdfsUrl,
updated.getHdfsUrl());
+ assertEquals("Peer HDFS URL should be updated", newPeerHdfsUrl,
updated.getPeerHdfsUrl());
+ }
+
+ /**
+ * update rejects malformed ZK URL fields even with --force: ZK URLs are
identity keys used by
+ * HAGroupStoreClient and must remain parseable.
+ */
+ @Test(timeout = 180000)
+ public void testUpdateRejectsMalformedPeerZkUrlEvenWhenForced() throws
Exception {
+ // Initialize the ZK znode from the @Before system-table row so -av can
read the version.
+ new HAGroupStoreManager(CLUSTERS.getHBaseCluster1().getConfiguration())
+ .getHAGroupStoreRecord(haGroupName);
+
+ System.setOut(new PrintStream(STDOUT_CAPTURE));
+ int ret = ToolRunner.run(adminTool,
+ new String[] { "update", "-g", haGroupName, "-pz", MALFORMED_URL, "-av"
});
+ assertEquals("Malformed peer ZK URL should be rejected",
RET_ARGUMENT_ERROR, ret);
+
+ STDOUT_CAPTURE.reset();
+ int forcedRet = ToolRunner.run(adminTool,
+ new String[] { "update", "-g", haGroupName, "-pz", MALFORMED_URL, "-av",
"-F" });
+ assertEquals("Malformed peer ZK URL should be rejected even with --force",
RET_ARGUMENT_ERROR,
+ forcedRet);
+ }
+
+ /**
+ * create rejects a malformed URL field and writes nothing.
+ */
+ @Test(timeout = 180000)
+ public void testCreateRejectsMalformedUrl() throws Exception {
+ String createGroup = "testCreateBadUrl_" + System.currentTimeMillis();
+ System.setOut(new PrintStream(STDOUT_CAPTURE));
+
+ int ret = ToolRunner.run(adminTool,
+ new String[] { "create", "-g", createGroup, "-p", "FAILOVER", "-zk1",
CLUSTERS.getZkUrl1(),
+ "-c1", MALFORMED_URL, "-cr1", "ACTIVE", "-zk2", CLUSTERS.getZkUrl2(),
"-c2",
+ CLUSTERS.getMasterAddress2(), "-cr2", "STANDBY", "-hdfs1",
CLUSTERS.getHdfsUrl1(), "-hdfs2",
+ CLUSTERS.getHdfsUrl2() });
+ assertEquals("Malformed cluster-url-1 should be rejected on create",
RET_ARGUMENT_ERROR, ret);
+
+ assertTrue("No row should be created for a rejected create",
+ querySystemTable(createGroup, CLUSTERS.getZkUrl1()) == null);
+ }
+
+ /**
+ * create rejects malformed ZK URL fields even with --force.
+ */
+ @Test(timeout = 180000)
+ public void testCreateRejectsMalformedZkUrlEvenWhenForced() throws Exception
{
+ String createGroup = "testCreateBadZkUrl_" + System.currentTimeMillis();
+ System.setOut(new PrintStream(STDOUT_CAPTURE));
+
+ int ret = ToolRunner.run(adminTool,
+ new String[] { "create", "-g", createGroup, "-p", "FAILOVER", "-zk1",
MALFORMED_URL, "-c1",
+ CLUSTERS.getMasterAddress1(), "-cr1", "ACTIVE", "-zk2",
CLUSTERS.getZkUrl2(), "-c2",
+ CLUSTERS.getMasterAddress2(), "-cr2", "STANDBY", "-hdfs1",
CLUSTERS.getHdfsUrl1(), "-hdfs2",
+ CLUSTERS.getHdfsUrl2(), "-F" });
+ assertEquals("Malformed zk-url-1 should be rejected even with --force",
RET_ARGUMENT_ERROR,
+ ret);
+
+ assertTrue("No row should be created for a rejected create",
+ querySystemTable(createGroup, CLUSTERS.getZkUrl1()) == null);
+ }
+
+ /**
+ * A single SYSTEM.HA_GROUP row with an unparseable ZK URL must not break
`list` enumeration.
+ */
+ @Test(timeout = 180000)
+ public void testListSkipsRowWithMalformedZkUrl() throws Exception {
+ String badRowGroup = "testBadZkRow_" + System.currentTimeMillis();
+ // Store malformed ZK URLs, but connect via the real local cluster
(overrideConnZkUrl).
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(badRowGroup,
MALFORMED_URL, MALFORMED_URL,
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.STANDBY,
+ CLUSTERS.getZkUrl1(), CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2());
+
+ System.setOut(new PrintStream(STDOUT_CAPTURE));
+ int ret = ToolRunner.run(adminTool, new String[] { "list" });
+ assertEquals("list must not crash when one row has a malformed ZK URL",
RET_SUCCESS, ret);
+
+ String output = STDOUT_CAPTURE.toString();
+ assertTrue("Valid group should still be listed",
output.contains("prod_cluster_alpha"));
+ assertTrue("Bad-URL row should be skipped from enumeration",
!output.contains(badRowGroup));
+ }
+
+ /**
+ * get-cluster-role-record reports a malformed stored cluster URL (marked
<invalid>) on
+ * stderr and returns an error code rather than crashing.
+ */
+ @Test(timeout = 180000)
+ public void testGetClusterRoleRecordMarksInvalidUrl() throws Exception {
+ String badUrlGroup = "testCrrBadUrl_" + System.currentTimeMillis();
+ // Local cluster URL is unparseable; zk URLs are valid so the group still
enumerates.
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(badUrlGroup,
CLUSTERS.getZkUrl1(),
+ CLUSTERS.getZkUrl2(), MALFORMED_URL, CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.STANDBY, null,
+ CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2());
+
+ // Diagnostic goes to stderr; capture both streams.
+ PrintStream capture = new PrintStream(STDOUT_CAPTURE);
+ System.setOut(capture);
+ System.setErr(capture);
+ int ret =
+ ToolRunner.run(adminTool, new String[] { "get-cluster-role-record",
"-g", badUrlGroup });
+
+ assertEquals("get-cluster-role-record should report a malformed stored URL
as an error",
+ PhoenixHAAdminTool.RET_UPDATE_ERROR, ret);
+ String output = STDOUT_CAPTURE.toString();
+ LOG.info("Got output from get-cluster-role-record (bad url):
\n++++++++\n{}++++++++\n", output);
+ assertTrue("Output should mark the malformed URL <invalid>",
output.contains("<invalid>"));
+ }
}