This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 28d2763c41 PHOENIX-7813 Align ReplicationLogDiscovery scheduler to 
round boundaries to fix file distribution imbalance across RS (#2432)
28d2763c41 is described below

commit 28d2763c4177097801a6be90bc06ca2820f476f8
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Tue May 5 18:34:28 2026 +0530

    PHOENIX-7813 Align ReplicationLogDiscovery scheduler to round boundaries to 
fix file distribution imbalance across RS (#2432)
---
 .../replication/ReplicationLogDiscovery.java       |  39 ++++--
 .../reader/ReplicationLogDiscoveryReplay.java      |  17 ---
 .../ReplicationLogDiscoveryReplayTestIT.java       |  32 +++--
 .../replication/ReplicationLogDiscoveryTest.java   | 153 ++++++++++++++++++++-
 4 files changed, 197 insertions(+), 44 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
index 175de94f87..3c64e9ec61 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
@@ -65,11 +65,6 @@ public abstract class ReplicationLogDiscovery {
    */
   private static final String DEFAULT_EXECUTOR_THREAD_NAME_FORMAT = 
"ReplicationLogDiscovery-%d";
 
-  /**
-   * Default interval in seconds between replay operations
-   */
-  private static final long DEFAULT_REPLAY_INTERVAL_SECONDS = 10;
-
   /**
    * Default timeout in seconds for graceful shutdown of the executor service
    */
@@ -150,13 +145,17 @@ public abstract class ReplicationLogDiscovery {
       // Initialize and schedule the executors
       scheduler = Executors.newScheduledThreadPool(getExecutorThreadCount(),
         new 
ThreadFactoryBuilder().setNameFormat(getExecutorThreadNameFormat()).build());
+      long initialDelayMs = computeAlignedInitialDelay();
+      long replayIntervalMs = getReplayIntervalMillis();
+      LOG.info("Scheduling replay for haGroup: {} with initialDelay={}ms, 
interval={}ms",
+        haGroupName, initialDelayMs, replayIntervalMs);
       scheduler.scheduleAtFixedRate(() -> {
         try {
           replay();
         } catch (Exception e) {
           LOG.error("Error during replay", e);
         }
-      }, 0, getReplayIntervalSeconds(), TimeUnit.SECONDS);
+      }, initialDelayMs, replayIntervalMs, TimeUnit.MILLISECONDS);
 
       isRunning = true;
       LOG.info("ReplicationLogDiscovery started for haGroup: {}", haGroupName);
@@ -293,7 +292,7 @@ public abstract class ReplicationLogDiscovery {
     long startTime = EnvironmentEdgeManager.currentTime();
     List<Path> files = 
replicationLogTracker.getNewFilesForRound(replicationRound);
     LOG.info("Number of new files for round {} is {}", replicationRound, 
files.size());
-    while (!files.isEmpty()) {
+    while (!files.isEmpty() && isRunning()) {
       processOneRandomFile(files);
       files = replicationLogTracker.getNewFilesForRound(replicationRound);
     }
@@ -323,7 +322,7 @@ public abstract class ReplicationLogDiscovery {
     LOG.info("Number of {} files with renameTimestampThreshold {} is {} for 
haGroup: {}",
       replicationLogTracker.getInProgressLogSubDirectoryName(), 
renameTimestampThreshold,
       files.size(), haGroupName);
-    while (!files.isEmpty()) {
+    while (!files.isEmpty() && isRunning()) {
       Optional<Path> failedFile = processOneRandomFile(files);
       if (failedFile.isPresent()) {
         String prefix = replicationLogTracker.getFilePrefix(failedFile.get());
@@ -472,12 +471,12 @@ public abstract class ReplicationLogDiscovery {
   }
 
   /**
-   * Returns the replay interval in seconds. Subclasses can override this 
method to provide custom
-   * intervals.
-   * @return The replay interval in seconds (default: 10 seconds).
+   * Returns the replay interval in milliseconds. Subclasses can override this 
method to provide
+   * custom intervals. Defaults to the round duration.
+   * @return The replay interval in milliseconds.
    */
-  public long getReplayIntervalSeconds() {
-    return DEFAULT_REPLAY_INTERVAL_SECONDS;
+  public long getReplayIntervalMillis() {
+    return roundTimeMills;
   }
 
   /**
@@ -507,6 +506,20 @@ public abstract class ReplicationLogDiscovery {
     return DEFAULT_WAITING_BUFFER_PERCENTAGE;
   }
 
+  /**
+   * Computes initial delay to align the scheduler to round-eligible 
boundaries so all RS wake up at
+   * the same wall-clock moment. A round becomes eligible when currentTime >= 
roundEndTime +
+   * bufferMillis, and rounds repeat every roundTimeMills. This gives a 
universal grid of eligible
+   * ticks at bufferMillis, bufferMillis + roundTimeMills, bufferMillis + 
2*roundTimeMills, etc.
+   * from epoch. All RS compute the same grid regardless of when start() is 
called.
+   * @return the initial delay in milliseconds until the next round-eligible 
tick
+   */
+  protected long computeAlignedInitialDelay() {
+    long now = EnvironmentEdgeManager.currentTime();
+    long elapsed = (now - bufferMillis) % roundTimeMills;
+    return (elapsed == 0) ? 0 : roundTimeMills - elapsed;
+  }
+
   public int getInProgressFileMaxRetries() {
     return conf.getInt(REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY,
       DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index f20ff7ae21..4d5f886d51 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -57,12 +57,6 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
   public static final String EXECUTOR_THREAD_NAME_FORMAT =
     "Phoenix-ReplicationLogDiscoveryReplay-%d";
 
-  /**
-   * Configuration key for replay interval in seconds
-   */
-  public static final String REPLICATION_REPLAY_INTERVAL_SECONDS_KEY =
-    "phoenix.replication.replay.interval.seconds";
-
   /**
    * Configuration key for shutdown timeout in seconds
    */
@@ -87,11 +81,6 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
   public static final String REPLICATION_REPLAY_WAITING_BUFFER_PERCENTAGE_KEY =
     "phoenix.replication.replay.waiting.buffer.percentage";
 
-  /**
-   * Default replay interval in seconds. Controls how frequently the replay 
process runs.
-   */
-  public static final long DEFAULT_REPLAY_INTERVAL_SECONDS = 60;
-
   /**
    * Default shutdown timeout in seconds. Maximum time to wait for executor 
service to shutdown
    * gracefully.
@@ -422,12 +411,6 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
     return EXECUTOR_THREAD_NAME_FORMAT;
   }
 
-  @Override
-  public long getReplayIntervalSeconds() {
-    return getConf().getLong(REPLICATION_REPLAY_INTERVAL_SECONDS_KEY,
-      DEFAULT_REPLAY_INTERVAL_SECONDS);
-  }
-
   @Override
   public long getShutdownTimeoutSeconds() {
     return getConf().getLong(REPLICATION_REPLAY_SHUTDOWN_TIMEOUT_SECONDS_KEY,
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
index 8d7568f422..628652d001 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
@@ -113,24 +113,32 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
HABaseIT {
   }
 
   /**
-   * Tests the replay interval configuration with default and custom values.
+   * Tests that replay interval always matches the configured round duration.
    */
   @Test
-  public void testGetReplayIntervalSeconds() throws IOException {
-    // Create ReplicationLogDiscoveryReplay instance
+  public void testGetReplayIntervalMillis() throws IOException {
+    // Test with default round duration
     TestableReplicationLogTracker fileTracker =
       createReplicationLogTracker(conf1, haGroupName, rootFs, rootUri);
     ReplicationLogDiscoveryReplay discovery = new 
ReplicationLogDiscoveryReplay(fileTracker);
+    long expectedRoundMillis =
+      
fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds()
+        * 1000L;
+    assertEquals("Replay interval should match round duration", 
expectedRoundMillis,
+      discovery.getReplayIntervalMillis());
 
-    // Test default value when no custom config is set
-    long defaultResult = discovery.getReplayIntervalSeconds();
-    assertEquals("Should return default value when no custom config is set",
-      ReplicationLogDiscoveryReplay.DEFAULT_REPLAY_INTERVAL_SECONDS, 
defaultResult);
-
-    // Test custom value when config is set
-    
conf1.setLong(ReplicationLogDiscoveryReplay.REPLICATION_REPLAY_INTERVAL_SECONDS_KEY,
 120L);
-    long customResult = discovery.getReplayIntervalSeconds();
-    assertEquals("Should return custom value when config is set", 120L, 
customResult);
+    // Test with custom round duration
+    
conf1.setInt(ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
+      120);
+    try {
+      TestableReplicationLogTracker fileTracker2 =
+        createReplicationLogTracker(conf1, haGroupName, rootFs, rootUri);
+      ReplicationLogDiscoveryReplay discovery2 = new 
ReplicationLogDiscoveryReplay(fileTracker2);
+      assertEquals("Replay interval should match custom round duration", 
120_000L,
+        discovery2.getReplayIntervalMillis());
+    } finally {
+      
conf1.unset(ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY);
+    }
   }
 
   /**
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
index caacf175ce..6ed7d8723f 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
@@ -95,6 +95,7 @@ public class ReplicationLogDiscoveryTest {
     
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
 0);
     discovery = Mockito.spy(new TestableReplicationLogDiscovery(fileTracker));
     Mockito.doReturn(metricsLogDiscovery).when(discovery).getMetrics();
+    Mockito.doReturn(true).when(discovery).isRunning();
   }
 
   @After
@@ -115,6 +116,7 @@ public class ReplicationLogDiscoveryTest {
    */
   @Test
   public void testStartAndStop() throws IOException {
+    Mockito.doCallRealMethod().when(discovery).isRunning();
     // 1. Validate that it's not running initially
     assertFalse("Discovery should not be running initially", 
discovery.isRunning());
 
@@ -138,8 +140,8 @@ public class ReplicationLogDiscoveryTest {
       threadName.contains("ReplicationLogDiscovery"));
 
     // Verify replay interval
-    long replayInterval = discovery.getReplayIntervalSeconds();
-    assertEquals("Replay interval should be 10 seconds", 10L, replayInterval);
+    long replayInterval = discovery.getReplayIntervalMillis();
+    assertEquals("Replay interval should be 60000 milliseconds", 60_000L, 
replayInterval);
 
     // 6. Ensure starting again does not create a new scheduler (and also 
should not throw any
     // exception)
@@ -159,6 +161,153 @@ public class ReplicationLogDiscoveryTest {
     assertFalse("Discovery should not be running after stop", 
discovery.isRunning());
   }
 
+  @Test
+  public void testComputeAlignedInitialDelay() {
+    long roundTimeMs = discovery.roundTimeMills;
+    long bufferMs = discovery.bufferMillis;
+
+    // RS initialize at different times within the same round window.
+    // All should align to the same next tick.
+    // With roundTimeMs=60000 and bufferMs=9000, ticks are at 9000, 69000, 
129000, ...
+    // Place all 3 RS between tick 69000 and tick 129000 so they all target 
129000.
+    AtomicLong mockTime = new AtomicLong();
+    EnvironmentEdgeManager.injectEdge(new EnvironmentEdge() {
+      @Override
+      public long currentTime() {
+        return mockTime.get();
+      }
+    });
+
+    try {
+      // RS-1 starts early in the window (10s after previous tick at 69000)
+      mockTime.set(79_000L);
+      long delay1 = discovery.computeAlignedInitialDelay();
+      long tick1 = mockTime.get() + delay1;
+
+      // RS-2 starts 30s later
+      mockTime.set(109_000L);
+      long delay2 = discovery.computeAlignedInitialDelay();
+      long tick2 = mockTime.get() + delay2;
+
+      // RS-3 starts 50s after RS-1 (just before the tick)
+      mockTime.set(128_000L);
+      long delay3 = discovery.computeAlignedInitialDelay();
+      long tick3 = mockTime.get() + delay3;
+
+      // All should align to the same tick (129000)
+      assertEquals("RS-1 and RS-2 should align to the same tick", tick1, 
tick2);
+      assertEquals("RS-2 and RS-3 should align to the same tick", tick2, 
tick3);
+      assertEquals("All should target tick at 129000", 129_000L, tick1);
+
+      // Delay should always be > 0 and <= roundTimeMs
+      assertTrue("Delay should be positive", delay1 > 0);
+      assertTrue("Delay should not exceed round time", delay1 <= roundTimeMs);
+      assertTrue("Delay should be positive", delay2 > 0);
+      assertTrue("Delay should not exceed round time", delay2 <= roundTimeMs);
+
+      // The aligned tick should be at a multiple of roundTimeMs offset by 
bufferMs
+      assertEquals("Tick should be aligned to round-eligible boundary", 0,
+        (tick1 - bufferMs) % roundTimeMs);
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+
+  @Test
+  public void testComputeAlignedInitialDelayExactlyOnTick() {
+    long roundTimeMs = discovery.roundTimeMills;
+    long bufferMs = discovery.bufferMillis;
+
+    AtomicLong mockTime = new AtomicLong();
+    EnvironmentEdgeManager.injectEdge(new EnvironmentEdge() {
+      @Override
+      public long currentTime() {
+        return mockTime.get();
+      }
+    });
+
+    try {
+      // Set time to exactly on a round-eligible tick boundary
+      long exactTick = roundTimeMs * 5 + bufferMs;
+      mockTime.set(exactTick);
+      long delay = discovery.computeAlignedInitialDelay();
+      assertEquals("Delay should be 0 when exactly on a tick", 0, delay);
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+
+  @Test
+  public void testComputeAlignedInitialDelaySlightlyBeyondBuffer() {
+    // Time is 12 seconds into the round (past the 9s buffer) — the tick has 
already passed,
+    // so we should wait until the next round-eligible tick
+    long roundTimeMs = discovery.roundTimeMills;
+    long bufferMs = discovery.bufferMillis;
+
+    AtomicLong mockTime = new AtomicLong();
+    EnvironmentEdgeManager.injectEdge(new EnvironmentEdge() {
+      @Override
+      public long currentTime() {
+        return mockTime.get();
+      }
+    });
+
+    try {
+      // Round-eligible tick is at roundTimeMs * 5 + bufferMs
+      // Set time to 3s past that tick (12s into the round, buffer is 9s)
+      long tick = roundTimeMs * 5 + bufferMs;
+      long now = tick + 3_000L;
+      mockTime.set(now);
+      long delay = discovery.computeAlignedInitialDelay();
+
+      // Should wait until the next tick: tick + roundTimeMs
+      long expectedDelay = roundTimeMs - 3_000L;
+      assertEquals("Should wait until next round-eligible tick", 
expectedDelay, delay);
+
+      // Verify the target tick is correct
+      long targetTick = now + delay;
+      assertEquals("Target should be next tick", tick + roundTimeMs, 
targetTick);
+      assertEquals("Target should be aligned", 0, (targetTick - bufferMs) % 
roundTimeMs);
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+
+  @Test
+  public void testComputeAlignedInitialDelaySlightlyBeforeBuffer() {
+    // Time is 6 seconds into the round (before the 9s buffer) — the tick 
hasn't arrived yet,
+    // so we should wait for it
+    long roundTimeMs = discovery.roundTimeMills;
+    long bufferMs = discovery.bufferMillis;
+
+    AtomicLong mockTime = new AtomicLong();
+    EnvironmentEdgeManager.injectEdge(new EnvironmentEdge() {
+      @Override
+      public long currentTime() {
+        return mockTime.get();
+      }
+    });
+
+    try {
+      // Round-eligible tick is at roundTimeMs * 5 + bufferMs
+      // Set time to 3s before that tick (6s into the round, buffer is 9s)
+      long tick = roundTimeMs * 5 + bufferMs;
+      long now = tick - 3_000L;
+      mockTime.set(now);
+      long delay = discovery.computeAlignedInitialDelay();
+
+      // Should wait 3s until the upcoming tick
+      assertEquals("Should wait until upcoming round-eligible tick", 3_000L, 
delay);
+
+      // Verify the target tick is correct
+      long targetTick = now + delay;
+      assertEquals("Target should be the upcoming tick", tick, targetTick);
+      assertEquals("Target should be aligned", 0, (targetTick - bufferMs) % 
roundTimeMs);
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+
   @Test
   public void testGetInProgressFileMaxRetries() {
     // Default value

Reply via email to