This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 2adeaeae47 PHOENIX-7602 Replication Log Writer (Store and Forward
mode) (Addendum) (#2370)
2adeaeae47 is described below
commit 2adeaeae479a54389bc0c9fffaae76b7372c5fe9
Author: tkhurana <[email protected]>
AuthorDate: Tue Feb 10 15:38:22 2026 -0800
PHOENIX-7602 Replication Log Writer (Store and Forward mode) (Addendum)
(#2370)
---
.../phoenix/replication/ReplicationLogGroupIT.java | 60 +++++++++++++---------
1 file changed, 35 insertions(+), 25 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
index 0ab30b1c8e..f8664ad978 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -22,13 +22,11 @@ import static
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvai
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
import static org.apache.phoenix.query.BaseTest.generateUniqueName;
-import static
org.apache.phoenix.query.QueryServices.SYNCHRONOUS_REPLICATION_ENABLED;
import static
org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.net.URI;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -36,7 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
@@ -51,6 +48,7 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.FailoverPhoenixConnection;
+import org.apache.phoenix.jdbc.HABaseIT;
import org.apache.phoenix.jdbc.HAGroupStoreRecord;
import org.apache.phoenix.jdbc.HighAvailabilityGroup;
import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
@@ -65,12 +63,10 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,20 +74,9 @@ import org.slf4j.LoggerFactory;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@Category(NeedsOwnMiniClusterTest.class)
-public class ReplicationLogGroupIT {
+public class ReplicationLogGroupIT extends HABaseIT {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationLogGroupIT.class);
- private static final HighAvailabilityTestingUtility.HBaseTestingUtilityPair
CLUSTERS =
- new HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
- @ClassRule
- public static TemporaryFolder standbyFolder = new TemporaryFolder();
- @ClassRule
- public static TemporaryFolder localFolder = new TemporaryFolder();
-
- private static Configuration conf1;
- private static Configuration conf2;
- private static URI standbyUri;
- private static URI fallbackUri;
private static String zkUrl;
private static String peerZkUrl;
@@ -106,14 +91,6 @@ public class ReplicationLogGroupIT {
@BeforeClass
public static void doSetup() throws Exception {
- conf1 = CLUSTERS.getHBaseCluster1().getConfiguration();
- conf1.setBoolean(SYNCHRONOUS_REPLICATION_ENABLED, true);
- conf2 = CLUSTERS.getHBaseCluster2().getConfiguration();
- conf2.setBoolean(SYNCHRONOUS_REPLICATION_ENABLED, true);
- standbyUri = new Path(standbyFolder.getRoot().toString()).toUri();
- fallbackUri = new Path(localFolder.getRoot().toString()).toUri();
- conf1.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY,
standbyUri.toString());
- conf1.set(ReplicationLogGroup.REPLICATION_FALLBACK_HDFS_URL_KEY,
fallbackUri.toString());
conf1.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 20);
CLUSTERS.start();
zkUrl = CLUSTERS.getZkUrl1();
@@ -313,6 +290,39 @@ public class ReplicationLogGroupIT {
}
}
+ @Test
+ public void testAppendAndSyncNoIndex() throws Exception {
+ final String tableName = "T_" + generateUniqueName();
+ try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection)
DriverManager
+ .getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
+ String ddl = String.format("create table %s (id1 integer not null, "
+ + "id2 integer not null, val1 varchar, val2 varchar "
+ + "constraint pk primary key (id1, id2))", tableName);
+ conn.createStatement().execute(ddl);
+ conn.commit();
+ PreparedStatement stmt =
+ conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?,
?)");
+ // upsert 50 rows
+ int rowCount = 50;
+ for (int i = 0; i < 5; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, j);
+ stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
+ stmt.setString(4, null);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ }
+ // verify replication
+ Map<String, Integer> expected = Maps.newHashMap();
+ // mutation count will be equal to row count since the atomic upsert
mutations will be
+ // ignored and therefore not replicated
+ expected.put(tableName, rowCount * 2); // Put + Delete
+ verifyReplication(expected);
+ }
+ }
+
/**
* This test simulates RS crashes in the middle of write transactions after
the edits have been
* written to the WAL but before they have been replicated to the standby
cluster. Those edits