This is an automated email from the ASF dual-hosted git repository.
lokiore pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 0a3c416171 PHOENIX-7898 :- Cloned PhoenixConnection loses HA group,
breaking _HAGroupName tagging on UPSERT SELECT and DELETE (#2515)
0a3c416171 is described below
commit 0a3c4161719c5c23e47c4dc4a0debe4ebd106284
Author: Lokesh Khurana <[email protected]>
AuthorDate: Fri Jun 12 16:50:17 2026 -0700
PHOENIX-7898 :- Cloned PhoenixConnection loses HA group, breaking
_HAGroupName tagging on UPSERT SELECT and DELETE (#2515)
* PHOENIX-7898 :- Cloned PhoenixConnection loses HA group, breaking
_HAGroupName tagging on UPSERT SELECT and DELETE
The four PhoenixConnection copy constructors that take an existing
PhoenixConnection as the first argument all delegate to the 9-arg
private constructor with `null` passed for the cloned connection's
haGroup field. Cloned connections produced by
MutatingParallelIteratorFactory.newIterator(...) (server-side
coprocessor for UPSERT SELECT and DELETE) therefore evaluate the
MutationState.annotateMutationWithMetadata guard
`if (connection.getHAGroup() != null && ...)` to false, the
_HAGroupName HBase attribute is not attached to those mutations, and
the consistent_failover (CCF) server's mutation-block contract is not
enforced for the affected write paths.
Three localized changes in phoenix-core-client:
1. PhoenixConnection.java -- four copy constructors propagate
connection.haGroup instead of null. Restores HA-group inheritance
for cloned connections produced by MutatingParallelIteratorFactory
and other clone paths.
2. PhoenixConnection.java -- split duplicated
`// Only available for connection which is part of HA Connections`
Javadoc on haGroupName / haGroup into field-specific descriptions
that document each field's distinct role.
3. ConnectionQueryServices.java -- convert the 3-arg
connect(String, Properties, HighAvailabilityGroup) overload from
abstract to default, delegating to the 2-arg connect(url, info).
Preserves source-compatibility for any 3rd-party
ConnectionQueryServices implementor that has not yet adopted the
3-arg overload.
Adds testUpsertSelectReplicatesViaCloneConnection in
ReplicationLogGroupIT to lock the regression. The test exercises an
UPSERT SELECT of 250 rows (above the default MUTATE_BATCH_SIZE = 100)
so the in-loop flush at UpsertCompiler.upsertSelect fires through the
cloned connection's MutationState.send() path -- the only path where
the missing haGroup field is observable. Pre-fix: target table
receives 50 of 250 rows (assertion fails). Post-fix: target table
receives 250 of 250 rows (assertion passes).
Generated-by: Claude Code (Opus 4.7)
* PHOENIX-7898 :- Use getHAGroup() instead of direct field access in copy
constructors
Replace direct `connection.haGroup` field access with the public
getter `connection.getHAGroup()` in the four PhoenixConnection copy
constructors. Preserves encapsulation and matches the existing
MutationState.annotateMutationWithMetadata guard which already uses
`connection.getHAGroup() != null`.
Generated-by: Claude Code (Opus 4.7)
* PHOENIX-7898 :- Remove haGroupName field from PhoenixConnection
Removes the redundant PhoenixConnection.haGroupName field; HA group name
is now derived through connection.getHAGroup().getName().
Adds HighAvailabilityGroup.getName() convenience accessor that delegates
to info.getName() so callers in org.apache.phoenix.util and
org.apache.phoenix.execute can access the HA group name without needing
visibility into the package-private HAGroupInfo.
Collapses the paired-gate
getHAGroup() != null && StringUtils.isNotBlank(getHAGroupName())
to single-gate
getHAGroup() != null
at both production callsites (ScanUtil.setScanAttributeForHAForPointLookups,
MutationState.annotateMutationWithMetadata). The blank check is impossible
in practice: HAGroupInfo's ctor calls Preconditions.checkNotNull(name) and
HighAvailabilityGroup.getUrlInfo throws HA_INVALID_PROPERTIES for empty
names before any HAGroupInfo is constructed, so a non-null HAGroup with a
blank/null name is structurally impossible.
Removes the now-unused StringUtils import from ScanUtil and MutationState.
Generated-by: Claude Code (Opus 4.7)
---
.../org/apache/phoenix/execute/MutationState.java | 5 ++-
.../apache/phoenix/jdbc/HighAvailabilityGroup.java | 9 +++++
.../org/apache/phoenix/jdbc/PhoenixConnection.java | 29 +++++----------
.../phoenix/query/ConnectionQueryServices.java | 6 ++--
.../java/org/apache/phoenix/util/ScanUtil.java | 8 ++---
.../phoenix/replication/ReplicationLogGroupIT.java | 41 ++++++++++++++++++++++
6 files changed, 66 insertions(+), 32 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index ed3652c826..84f4c8a72d 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -57,7 +57,6 @@ import java.util.Map.Entry;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.Immutable;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -947,8 +946,8 @@ public class MutationState implements SQLCloseable {
table.getLastDDLTimestamp() != null ?
Bytes.toBytes(table.getLastDDLTimestamp()) : null;
byte[] haGroupName = null;
// Only set haGroupName if connection is part of HA Connection
- if (connection.getHAGroup() != null &&
StringUtils.isNotBlank(connection.getHAGroupName())) {
- haGroupName = Bytes.toBytes(connection.getHAGroupName());
+ if (connection.getHAGroup() != null) {
+ haGroupName = Bytes.toBytes(connection.getHAGroup().getName());
}
WALAnnotationUtil.annotateMutation(mutation, tenantId, schemaName,
tableName, tableType,
lastDDLTimestamp, haGroupName);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
index 6b9728e797..29dbf21dc8 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
@@ -745,6 +745,15 @@ public class HighAvailabilityGroup {
return info;
}
+ /**
+ * Returns the HA group name. Convenience accessor — equivalent to
+ * {@code getGroupInfo().getName()} but accessible to callers outside this
package
+ * ({@link HAGroupInfo} is package-private).
+ */
+ public String getName() {
+ return info.getName();
+ }
+
@VisibleForTesting
public Properties getProperties() {
return properties;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 07c4b2d419..970f3babae 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -185,9 +185,10 @@ public class PhoenixConnection
private LogLevel auditLogLevel;
private Double logSamplingRate;
private String sourceOfOperation;
- @Nullable // Only available for connection which is part of HA Connections
- private String haGroupName;
- @Nullable // Only available for connection which is part of HA Connections
+ @Nullable
+ // Resolved HA group object used for failover orchestration and cluster-role
+ // lookups. Set on root HA connections; cloned connections inherit it via the
+ // copy constructors.
private HighAvailabilityGroup haGroup;
private volatile SQLException reasonForClose;
private static final String[] CONNECTION_PROPERTIES;
@@ -219,7 +220,7 @@ public class PhoenixConnection
boolean isRunningUpgrade) throws SQLException {
this(connection.getQueryServices(), connection.getURL(),
connection.getClientInfo(),
connection.getMutationState(), isDescRowKeyOrderUpgrade,
isRunningUpgrade,
- connection.buildingIndex, true, null);
+ connection.buildingIndex, true, connection.getHAGroup());
this.isAutoCommit = connection.isAutoCommit;
this.isAutoFlush = connection.isAutoFlush;
this.sampler = connection.sampler;
@@ -234,7 +235,7 @@ public class PhoenixConnection
throws SQLException {
this(connection.getQueryServices(), connection.getURL(),
connection.getClientInfo(),
mutationState, connection.isDescVarLengthRowKeyUpgrade(),
connection.isRunningUpgrade(),
- connection.buildingIndex, true, null);
+ connection.buildingIndex, true, connection.getHAGroup());
}
public PhoenixConnection(PhoenixConnection connection, long scn) throws
SQLException {
@@ -244,7 +245,7 @@ public class PhoenixConnection
public PhoenixConnection(PhoenixConnection connection, Properties props)
throws SQLException {
this(connection.getQueryServices(), connection.getURL(), props,
connection.getMutationState(),
connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(),
- connection.buildingIndex, true, null);
+ connection.buildingIndex, true, connection.getHAGroup());
this.isAutoCommit = connection.isAutoCommit;
this.isAutoFlush = connection.isAutoFlush;
this.sampler = connection.sampler;
@@ -264,7 +265,7 @@ public class PhoenixConnection
public PhoenixConnection(PhoenixConnection connection,
ConnectionQueryServices services,
Properties info) throws SQLException {
this(services, connection.url, info, null,
connection.isDescVarLengthRowKeyUpgrade(),
- connection.isRunningUpgrade(), connection.buildingIndex, true, null);
+ connection.isRunningUpgrade(), connection.buildingIndex, true,
connection.getHAGroup());
}
private PhoenixConnection(ConnectionQueryServices services, String url,
Properties info,
@@ -286,8 +287,6 @@ public class PhoenixConnection
// Copy so client cannot change
this.info = PropertiesUtil.deepCopy(info);
- // get HAGroupName from the connection info
- this.haGroupName =
info.getProperty(HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR);
this.haGroup = haGroup;
final PName tenantId = JDBCUtil.getTenantId(url, info);
if (this.info.isEmpty() && tenantId == null) {
@@ -1188,18 +1187,6 @@ public class PhoenixConnection
this.consistency = val;
}
- /**
- * This is temporary method to set HAGroupName for the connection. This will
be removed once we
- * have a proper way to set HAGroupName in the connection.
- */
- public void setHAGroupName(String haGroupName) {
- this.haGroupName = haGroupName;
- }
-
- public String getHAGroupName() {
- return this.haGroupName;
- }
-
/**
* Get the HAGroup for which this connection is part of, This will return
null for Single Cluster
* Connections
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 1eb2716c3e..37e0f1d229 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -148,8 +148,10 @@ public interface ConnectionQueryServices extends
QueryServices, MetaDataMutated
public PhoenixConnection connect(String url, Properties info) throws
SQLException;
- public PhoenixConnection connect(String url, Properties info,
HighAvailabilityGroup haGroup)
- throws SQLException;
+ default PhoenixConnection connect(String url, Properties info,
HighAvailabilityGroup haGroup)
+ throws SQLException {
+ return connect(url, info);
+ }
/**
* @param tableTimestamp timestamp of table if its present in the client
side cache
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
index a139952581..b9f47ac329 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -41,7 +41,6 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeMap;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -1516,12 +1515,9 @@ public class ScanUtil {
*/
public static void setScanAttributeForHAForPointLookups(Scan scan,
PhoenixConnection conn,
StatementContext context) {
- if (
- context.getScanRanges().isPointLookup() && conn.getHAGroup() != null
- && StringUtils.isNotBlank(conn.getHAGroupName())
- ) {
+ if (context.getScanRanges().isPointLookup() && conn.getHAGroup() != null) {
scan.setAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB,
- Bytes.toBytes(conn.getHAGroupName()));
+ Bytes.toBytes(conn.getHAGroup().getName()));
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
index 776fbcb509..cb4ca7bcee 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -273,6 +273,47 @@ public class ReplicationLogGroupIT extends HABaseIT {
}
}
+ @Test
+ public void testUpsertSelectReplicatesViaCloneConnection() throws Exception {
+ final String sourceTable = "T_" + generateUniqueName();
+ final String targetTable = "T_" + generateUniqueName();
+ final String createSourceDdl = String.format(
+ "create table if not exists %s (id integer not null primary key, val
varchar)", sourceTable);
+ final String createTargetDdl = String.format(
+ "create table if not exists %s (id integer not null primary key, val
varchar)", targetTable);
+
+ // Must exceed the default MUTATE_BATCH_SIZE (100) so the in-loop flush at
+ // UpsertCompiler.upsertSelect line ~288 fires. That flush calls send() on
the cloned
+ // connection's MutationState — the only path where the missing haGroup
field is observable.
+ // For smaller row counts the chunk's mutations are joined back to the
parent and the parent's
+ // commit does the annotation with its (non-null) haGroup, so the bug
stays hidden.
+ final int rowCount = 250;
+ try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection)
DriverManager
+ .getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
+ conn.createStatement().execute(createSourceDdl);
+ conn.createStatement().execute(createTargetDdl);
+ PreparedStatement insert =
+ conn.prepareStatement("upsert into " + sourceTable + " values (?, ?)");
+ for (int i = 0; i < rowCount; i++) {
+ insert.setInt(1, i);
+ insert.setString(2, "v" + i);
+ insert.executeUpdate();
+ }
+ conn.commit();
+
+ conn.setAutoCommit(true);
+ conn.createStatement()
+ .execute("upsert into " + targetTable + " select id, val from " +
sourceTable);
+ }
+
+ Map<String, Integer> expected = Maps.newHashMap();
+ expected.put(sourceTable, rowCount); // direct upserts on the parent
connection
+ expected.put(targetTable, rowCount); // upsert-select rows — currently
fails: gets 0
+ expected.put(SYSTEM_CATALOG_NAME, 0);
+ expected.put(SYSTEM_CHILD_LINK_NAME, 0);
+ verifyReplication(expected);
+ }
+
@Test
public void testAppendAndSync() throws Exception {
final String tableName = "T_" + generateUniqueName();