This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 2adeaeae47 PHOENIX-7602 Replication Log Writer (Store and Forward 
mode) (Addendum) (#2370)
2adeaeae47 is described below

commit 2adeaeae479a54389bc0c9fffaae76b7372c5fe9
Author: tkhurana <[email protected]>
AuthorDate: Tue Feb 10 15:38:22 2026 -0800

    PHOENIX-7602 Replication Log Writer (Store and Forward mode) (Addendum) 
(#2370)
---
 .../phoenix/replication/ReplicationLogGroupIT.java | 60 +++++++++++++---------
 1 file changed, 35 insertions(+), 25 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
index 0ab30b1c8e..f8664ad978 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -22,13 +22,11 @@ import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvai
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
 import static org.apache.phoenix.query.BaseTest.generateUniqueName;
-import static 
org.apache.phoenix.query.QueryServices.SYNCHRONOUS_REPLICATION_ENABLED;
 import static 
org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.net.URI;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -36,7 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
@@ -51,6 +48,7 @@ import org.apache.hadoop.hbase.util.Threads;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.FailoverPhoenixConnection;
+import org.apache.phoenix.jdbc.HABaseIT;
 import org.apache.phoenix.jdbc.HAGroupStoreRecord;
 import org.apache.phoenix.jdbc.HighAvailabilityGroup;
 import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
@@ -65,12 +63,10 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,20 +74,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
 @Category(NeedsOwnMiniClusterTest.class)
-public class ReplicationLogGroupIT {
+public class ReplicationLogGroupIT extends HABaseIT {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogGroupIT.class);
-  private static final HighAvailabilityTestingUtility.HBaseTestingUtilityPair 
CLUSTERS =
-    new HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
 
-  @ClassRule
-  public static TemporaryFolder standbyFolder = new TemporaryFolder();
-  @ClassRule
-  public static TemporaryFolder localFolder = new TemporaryFolder();
-
-  private static Configuration conf1;
-  private static Configuration conf2;
-  private static URI standbyUri;
-  private static URI fallbackUri;
   private static String zkUrl;
   private static String peerZkUrl;
 
@@ -106,14 +91,6 @@ public class ReplicationLogGroupIT {
 
   @BeforeClass
   public static void doSetup() throws Exception {
-    conf1 = CLUSTERS.getHBaseCluster1().getConfiguration();
-    conf1.setBoolean(SYNCHRONOUS_REPLICATION_ENABLED, true);
-    conf2 = CLUSTERS.getHBaseCluster2().getConfiguration();
-    conf2.setBoolean(SYNCHRONOUS_REPLICATION_ENABLED, true);
-    standbyUri = new Path(standbyFolder.getRoot().toString()).toUri();
-    fallbackUri = new Path(localFolder.getRoot().toString()).toUri();
-    conf1.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY, 
standbyUri.toString());
-    conf1.set(ReplicationLogGroup.REPLICATION_FALLBACK_HDFS_URL_KEY, 
fallbackUri.toString());
     conf1.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 20);
     CLUSTERS.start();
     zkUrl = CLUSTERS.getZkUrl1();
@@ -313,6 +290,39 @@ public class ReplicationLogGroupIT {
     }
   }
 
+  @Test
+  public void testAppendAndSyncNoIndex() throws Exception {
+    final String tableName = "T_" + generateUniqueName();
+    try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection) 
DriverManager
+      .getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
+      String ddl = String.format("create table %s (id1 integer not null, "
+        + "id2 integer not null, val1 varchar, val2 varchar "
+        + "constraint pk primary key (id1, id2))", tableName);
+      conn.createStatement().execute(ddl);
+      conn.commit();
+      PreparedStatement stmt =
+        conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?, 
?)");
+      // upsert 50 rows
+      int rowCount = 50;
+      for (int i = 0; i < 5; ++i) {
+        for (int j = 0; j < 10; ++j) {
+          stmt.setInt(1, i);
+          stmt.setInt(2, j);
+          stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
+          stmt.setString(4, null);
+          stmt.executeUpdate();
+        }
+        conn.commit();
+      }
+      // verify replication
+      Map<String, Integer> expected = Maps.newHashMap();
+      // mutation count will be equal to row count since the atomic upsert 
mutations will be
+      // ignored and therefore not replicated
+      expected.put(tableName, rowCount * 2); // Put + Delete
+      verifyReplication(expected);
+    }
+  }
+
   /**
    * This test simulates RS crashes in the middle of write transactions after 
the edits have been
    * written to the WAL but before they have been replicated to the standby 
cluster. Those edits

Reply via email to