This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 30aaabb457 Phoenix-7672 Adding API to get new files within range of 
rounds (#2355)
30aaabb457 is described below

commit 30aaabb457e7e606271e8a35e15f4ba0bcecc26a
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Tue Jan 27 09:28:51 2026 +0530

    Phoenix-7672 Adding API to get new files within range of rounds (#2355)
---
 .../phoenix/replication/ReplicationLogTracker.java |  28 ++
 .../reader/ReplicationLogDiscoveryReplay.java      |  43 +++-
 .../ReplicationLogDiscoveryReplayTestIT.java       |  34 ++-
 .../replication/ReplicationLogTrackerTest.java     | 286 +++++++++++++++++++++
 4 files changed, 373 insertions(+), 18 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
index 92031c1349..03832a4d85 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
@@ -151,6 +151,34 @@ public class ReplicationLogTracker {
     return filesInRound;
   }
 
+  /**
+   * Retrieves new replication log files that belong to replication rounds 
from startRound to
+   * endRound (inclusive). Iterates through all rounds in the range and 
collects valid log files
+   * from each round's shard directory.
+   * @param startRound - The starting replication round (inclusive)
+   * @param endRound   - The ending replication round (inclusive)
+   * @return List of valid log file paths from startRound to endRound, empty 
list if startRound >
+   *         endRound
+   * @throws IOException if there's an error accessing the file system
+   */
+  public List<Path> getNewFiles(ReplicationRound startRound, ReplicationRound 
endRound)
+    throws IOException {
+    List<Path> files = new ArrayList<>();
+    // Early return if startRound is after endRound (invalid range)
+    if (startRound.getStartTime() > endRound.getStartTime()) {
+      return files;
+    }
+    // Iterate through all rounds from startRound to endRound (exclusive of 
endRound)
+    ReplicationRound firstRound = startRound;
+    while (!firstRound.equals(endRound)) {
+      files.addAll(getNewFilesForRound(firstRound));
+      firstRound = replicationShardDirectoryManager.getNextRound(firstRound);
+    }
+    // Add the files for the endRound (inclusive)
+    files.addAll(getNewFilesForRound(endRound));
+    return files;
+  }
+
   /**
    * Retrieves all valid log files currently in the in-progress directory.
    * @return List of valid log file paths in the in-progress directory, empty 
list if directory
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index 36daca8b97..b6ce96e62c 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -31,6 +31,7 @@ import org.apache.phoenix.jdbc.HAGroupStoreRecord;
 import org.apache.phoenix.replication.ReplicationLogDiscovery;
 import org.apache.phoenix.replication.ReplicationLogTracker;
 import org.apache.phoenix.replication.ReplicationRound;
+import org.apache.phoenix.replication.ReplicationShardDirectoryManager;
 import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery;
 import 
org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplayImpl;
 import org.slf4j.Logger;
@@ -455,15 +456,47 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
    * Determines whether failover should be triggered based on completion 
criteria. Failover is safe
    * to trigger when all of the following conditions are met: 1. A failover 
has been requested
    * (failoverPending is true) 2. No files are currently in the in-progress 
directory 3. No new
-   * files exist for ongoing round These conditions ensure all replication 
logs have been processed
-   * before transitioning the cluster from STANDBY to ACTIVE state.
+   * files exist from the next round to process up to the current timestamp 
round. The third
+   * condition checks for new files in the range from nextRoundToProcess 
(derived from
+   * getLastRoundProcessed()) to currentTimestampRound (derived from current 
time). This ensures all
+   * replication logs up to the current time have been processed before 
transitioning the cluster
+   * from STANDBY to ACTIVE state.
    * @return true if all conditions are met and failover should be triggered, 
false otherwise
    * @throws IOException if there's an error checking file status
    */
   protected boolean shouldTriggerFailover() throws IOException {
-    return failoverPending.get() && 
replicationLogTracker.getInProgressFiles().isEmpty()
-      && replicationLogTracker.getNewFilesForRound(replicationLogTracker
-        
.getReplicationShardDirectoryManager().getNextRound(getLastRoundProcessed())).isEmpty();
+    LOG.debug("Checking if failover should be triggered. failoverPending={}", 
failoverPending);
+    // Check if failover has been requested
+    if (!failoverPending.get()) {
+      LOG.debug("Failover not triggered. failoverPending is false.");
+      return false;
+    }
+    // Check if in-progress directory is empty
+    boolean isInProgressDirectoryEmpty = 
replicationLogTracker.getInProgressFiles().isEmpty();
+    if (!isInProgressDirectoryEmpty) {
+      LOG.debug("Failover not triggered. In progress directory is not empty.");
+      return false;
+    }
+    // Check if there are any new files from next round to current timestamp 
round
+    ReplicationShardDirectoryManager replicationShardDirectoryManager =
+      replicationLogTracker.getReplicationShardDirectoryManager();
+    ReplicationRound nextRoundToProcess =
+      replicationShardDirectoryManager.getNextRound(getLastRoundProcessed());
+    ReplicationRound currentTimestampRound = replicationShardDirectoryManager
+      .getReplicationRoundFromStartTime(EnvironmentEdgeManager.currentTime());
+    LOG.debug("Checking the new files from next round {} to current timestamp 
round {}.",
+      nextRoundToProcess, currentTimestampRound);
+    boolean isInDirectoryEmpty =
+      replicationLogTracker.getNewFiles(nextRoundToProcess, 
currentTimestampRound).isEmpty();
+
+    if (!isInDirectoryEmpty) {
+      LOG.debug(
+        "Failover not triggered. New files exist from next round to current " 
+ "timestamp round.");
+      return false;
+    }
+
+    LOG.info("Failover can be triggered.");
+    return true;
   }
 
   protected void triggerFailover() {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
index 7668838026..27d62e71ea 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
@@ -1816,7 +1816,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
 
   /**
    * Tests the shouldTriggerFailover method with various combinations of 
failoverPending,
-   * in-progress files, and new files for next round.
+   * in-progress files, and new files from next round to current timestamp 
round.
    */
   @Test
   public void testShouldTriggerFailover() throws IOException {
@@ -1838,13 +1838,16 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
     try {
       // Create test rounds
       ReplicationRound testRound = new ReplicationRound(1704153600000L, 
1704153660000L);
-      ReplicationRound nextRound =
-        tracker.getReplicationShardDirectoryManager().getNextRound(testRound);
+      ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+      ReplicationRound nextRoundToProcess = 
shardManager.getNextRound(testRound);
+      ReplicationRound currentTimestampRound =
+        shardManager.getReplicationRoundFromStartTime(currentTime);
 
       // Test Case 1: All conditions true - should return true
       {
         when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-        
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+        when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
+          .thenReturn(Collections.emptyList());
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
         discovery.setLastRoundInSync(testRound);
@@ -1858,7 +1861,8 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
       // Test Case 2: failoverPending is false - should return false
       {
         when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-        
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+        when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
+          .thenReturn(Collections.emptyList());
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
         discovery.setLastRoundInSync(testRound);
@@ -1873,7 +1877,8 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
       {
         when(tracker.getInProgressFiles())
           .thenReturn(Collections.singletonList(new Path("test.plog")));
-        
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+        when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
+          .thenReturn(Collections.emptyList());
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
         discovery.setLastRoundInSync(testRound);
@@ -1884,10 +1889,11 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
           discovery.shouldTriggerFailover());
       }
 
-      // Test Case 4: new files exist for next round - should return false
+      // Test Case 4: new files exist from next round to current timestamp 
round - should return
+      // false
       {
         when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-        when(tracker.getNewFilesForRound(nextRound))
+        when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
           .thenReturn(Collections.singletonList(new Path("test.plog")));
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
@@ -1895,7 +1901,8 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
         discovery.setLastRoundProcessed(testRound);
         discovery.setFailoverPending(true);
 
-        assertFalse("Should not trigger failover when new files exist for next 
round",
+        assertFalse(
+          "Should not trigger failover when new files exist from next round to 
current timestamp round",
           discovery.shouldTriggerFailover());
       }
 
@@ -1903,7 +1910,8 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
       {
         when(tracker.getInProgressFiles())
           .thenReturn(Collections.singletonList(new Path("test.plog")));
-        
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+        when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
+          .thenReturn(Collections.emptyList());
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
         discovery.setLastRoundInSync(testRound);
@@ -1918,7 +1926,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
       // Test Case 6: failoverPending false AND new files exist - should 
return false
       {
         when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-        when(tracker.getNewFilesForRound(nextRound))
+        when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
           .thenReturn(Collections.singletonList(new Path("test.plog")));
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
@@ -1934,7 +1942,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
       {
         when(tracker.getInProgressFiles())
           .thenReturn(Collections.singletonList(new Path("test1.plog")));
-        when(tracker.getNewFilesForRound(nextRound))
+        when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
           .thenReturn(Collections.singletonList(new Path("test2.plog")));
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
@@ -1950,7 +1958,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
       {
         when(tracker.getInProgressFiles())
           .thenReturn(Collections.singletonList(new Path("test.plog")));
-        when(tracker.getNewFilesForRound(nextRound))
+        when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
           .thenReturn(Collections.singletonList(new Path("test2.plog")));
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
index 2a2b6055ae..2037b25c0e 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
@@ -1362,6 +1362,292 @@ public class ReplicationLogTrackerTest {
       resultFilenames.contains(invalidFile3.getName()));
   }
 
+  @Test
+  public void testGetNewFilesWithStartAndEndRound() throws IOException {
+    // Initialize tracker
+    tracker.init();
+
+    ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+    long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 
1000L;
+
+    // Create start round (60 seconds duration)
+    long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+    long startRoundEndTime = startRoundStartTime + roundDurationMs;
+    ReplicationRound startRound = new ReplicationRound(startRoundStartTime, 
startRoundEndTime);
+
+    // Create middle round
+    long middleRoundStartTime = startRoundEndTime;
+    long middleRoundEndTime = middleRoundStartTime + roundDurationMs;
+    ReplicationRound middleRound = new ReplicationRound(middleRoundStartTime, 
middleRoundEndTime);
+
+    // Create end round
+    long endRoundStartTime = middleRoundEndTime;
+    long endRoundEndTime = endRoundStartTime + roundDurationMs;
+    ReplicationRound endRound = new ReplicationRound(endRoundStartTime, 
endRoundEndTime);
+
+    // Get shard directories for each round
+    Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+    Path middleRoundShardDir = shardManager.getShardDirectory(middleRound);
+    Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+    // Create shard directories
+    localFs.mkdirs(startRoundShardDir);
+    localFs.mkdirs(middleRoundShardDir);
+    localFs.mkdirs(endRoundShardDir);
+
+    // Create files in start round
+    Path startRoundFile1 = new Path(startRoundShardDir, startRoundStartTime + 
"_rs1.plog");
+    Path startRoundFile2 = new Path(startRoundShardDir, startRoundStartTime + 
30000 + "_rs2.plog");
+
+    // Create files in middle round
+    Path middleRoundFile1 = new Path(middleRoundShardDir, middleRoundStartTime 
+ "_rs3.plog");
+    Path middleRoundFile2 =
+      new Path(middleRoundShardDir, middleRoundStartTime + 30000 + 
"_rs4.plog");
+
+    // Create files in end round
+    Path endRoundFile1 = new Path(endRoundShardDir, endRoundStartTime + 
"_rs5.plog");
+    Path endRoundFile2 = new Path(endRoundShardDir, endRoundStartTime + 30000 
+ "_rs6.plog");
+
+    // Create files outside the range (before start round)
+    ReplicationRound beforeStartRound = 
shardManager.getPreviousRound(startRound);
+    Path beforeStartRoundShardDir = 
shardManager.getShardDirectory(beforeStartRound);
+    localFs.mkdirs(beforeStartRoundShardDir);
+    Path beforeStartRoundFile =
+      new Path(beforeStartRoundShardDir, beforeStartRound.getStartTime() + 
"_rs0.plog");
+
+    // Create files outside the range (after end round)
+    ReplicationRound afterEndRound = shardManager.getNextRound(endRound);
+    Path afterEndRoundShardDir = shardManager.getShardDirectory(afterEndRound);
+    localFs.mkdirs(afterEndRoundShardDir);
+    Path afterEndRoundFile =
+      new Path(afterEndRoundShardDir, afterEndRound.getStartTime() + 
"_rs7.plog");
+
+    // Create all files
+    localFs.create(startRoundFile1, true).close();
+    localFs.create(startRoundFile2, true).close();
+    localFs.create(middleRoundFile1, true).close();
+    localFs.create(middleRoundFile2, true).close();
+    localFs.create(endRoundFile1, true).close();
+    localFs.create(endRoundFile2, true).close();
+    localFs.create(beforeStartRoundFile, true).close();
+    localFs.create(afterEndRoundFile, true).close();
+
+    // Call getNewFiles with startRound and endRound
+    List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+    // Verify file system operations
+    // Should call getNewFilesForRound for each round (start, middle, end)
+    // Each call to getNewFilesForRound calls exists() and listStatus() on 
shard directories
+    // Note: init() already called exists() on in-progress directory
+    Mockito.verify(mockFs, times(1)).exists(Mockito.eq(startRoundShardDir));
+    Mockito.verify(mockFs, times(1)).exists(Mockito.eq(middleRoundShardDir));
+    Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir));
+    Mockito.verify(mockFs, times(3)).listStatus(Mockito.any(Path.class));
+
+    // Prepare expected set of file paths (should include files from start, 
middle, and end rounds)
+    Set<String> expectedPaths = new HashSet<>();
+    expectedPaths.add(startRoundFile1.toString());
+    expectedPaths.add(startRoundFile2.toString());
+    expectedPaths.add(middleRoundFile1.toString());
+    expectedPaths.add(middleRoundFile2.toString());
+    expectedPaths.add(endRoundFile1.toString());
+    expectedPaths.add(endRoundFile2.toString());
+
+    // Create actual set of paths
+    Set<String> actualPaths =
+      result.stream().map(path -> 
path.toUri().getPath()).collect(Collectors.toSet());
+
+    // Verify all files from start to end rounds are returned
+    assertEquals("Should return exactly 6 files from start to end rounds", 
expectedPaths.size(),
+      actualPaths.size());
+    assertEquals("File paths do not match", expectedPaths, actualPaths);
+
+    // Verify files outside the range are not included
+    assertFalse("Should not contain file from before start round",
+      actualPaths.contains(beforeStartRoundFile.toString()));
+    assertFalse("Should not contain file from after end round",
+      actualPaths.contains(afterEndRoundFile.toString()));
+  }
+
+  @Test
+  public void testGetNewFilesWithSameStartAndEndRound() throws IOException {
+    // Initialize tracker
+    tracker.init();
+
+    ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+    long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 
1000L;
+
+    // Create a single round
+    long roundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+    long roundEndTime = roundStartTime + roundDurationMs;
+    ReplicationRound round = new ReplicationRound(roundStartTime, 
roundEndTime);
+
+    // Get shard directory for this round
+    Path roundShardDir = shardManager.getShardDirectory(round);
+    localFs.mkdirs(roundShardDir);
+
+    // Create files in the round
+    Path file1 = new Path(roundShardDir, roundStartTime + "_rs1.plog");
+    Path file2 = new Path(roundShardDir, roundStartTime + 30000 + "_rs2.plog");
+    Path file3 = new Path(roundShardDir, roundStartTime + 50000 + "_rs3.plog");
+
+    // Create files
+    localFs.create(file1, true).close();
+    localFs.create(file2, true).close();
+    localFs.create(file3, true).close();
+
+    // Call getNewFiles with same start and end round
+    List<Path> result = tracker.getNewFiles(round, round);
+
+    // Verify file system operations
+    // Should call getNewFilesForRound once for the round
+    // Note: init() already called exists() on in-progress directory
+    Mockito.verify(mockFs, times(1)).exists(Mockito.eq(roundShardDir));
+    Mockito.verify(mockFs, times(1)).listStatus(Mockito.any(Path.class));
+
+    // Prepare expected set of file paths
+    Set<String> expectedPaths = new HashSet<>();
+    expectedPaths.add(file1.toString());
+    expectedPaths.add(file2.toString());
+    expectedPaths.add(file3.toString());
+
+    // Create actual set of paths
+    Set<String> actualPaths =
+      result.stream().map(path -> 
path.toUri().getPath()).collect(Collectors.toSet());
+
+    // Verify all files from the round are returned
+    assertEquals("Should return exactly 3 files from the round", 
expectedPaths.size(),
+      actualPaths.size());
+    assertEquals("File paths do not match", expectedPaths, actualPaths);
+  }
+
+  @Test
+  public void testGetNewFilesWithInvalidRange() throws IOException {
+    // Initialize tracker
+    tracker.init();
+
+    ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+    long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 
1000L;
+
+    // Create end round (earlier time)
+    long endRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+    long endRoundEndTime = endRoundStartTime + roundDurationMs;
+    ReplicationRound endRound = new ReplicationRound(endRoundStartTime, 
endRoundEndTime);
+
+    // Create start round (later time) - invalid: start > end
+    long startRoundStartTime = endRoundEndTime;
+    long startRoundEndTime = startRoundStartTime + roundDurationMs;
+    ReplicationRound startRound = new ReplicationRound(startRoundStartTime, 
startRoundEndTime);
+
+    // Get shard directories
+    Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+    Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+    // Create shard directories
+    localFs.mkdirs(startRoundShardDir);
+    localFs.mkdirs(endRoundShardDir);
+
+    // Create files in both rounds
+    Path startRoundFile = new Path(startRoundShardDir, startRoundStartTime + 
"_rs1.plog");
+    Path endRoundFile = new Path(endRoundShardDir, endRoundStartTime + 
"_rs2.plog");
+
+    localFs.create(startRoundFile, true).close();
+    localFs.create(endRoundFile, true).close();
+
+    // Call getNewFiles with invalid range (startRound.getStartTime() > 
endRound.getStartTime())
+    List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+    // Verify empty list is returned when startRound.getStartTime() > 
endRound.getStartTime()
+    assertTrue("Should return empty list for invalid range", result.isEmpty());
+
+    // Verify no file system operations were performed on shard directories 
(early return)
+    // Note: init() already called exists() and mkdirs() on in-progress 
directory
+    Mockito.verify(mockFs, times(0)).exists(Mockito.eq(startRoundShardDir));
+    Mockito.verify(mockFs, times(0)).exists(Mockito.eq(endRoundShardDir));
+    Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
+  }
+
+  @Test
+  public void testGetNewFilesWithEmptyRounds() throws IOException {
+    // Initialize tracker
+    tracker.init();
+
+    ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+    long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 
1000L;
+
+    // Create start round
+    long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+    long startRoundEndTime = startRoundStartTime + roundDurationMs;
+    ReplicationRound startRound = new ReplicationRound(startRoundStartTime, 
startRoundEndTime);
+
+    // Create end round
+    long endRoundStartTime = startRoundEndTime;
+    long endRoundEndTime = endRoundStartTime + roundDurationMs;
+    ReplicationRound endRound = new ReplicationRound(endRoundStartTime, 
endRoundEndTime);
+
+    // Get shard directories
+    Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+    Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+    // Create shard directories but leave them empty
+    localFs.mkdirs(startRoundShardDir);
+    localFs.mkdirs(endRoundShardDir);
+
+    // Call getNewFiles with empty rounds
+    List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+    // Verify file system operations
+    // Should call getNewFilesForRound for each round (start and end)
+    // Note: init() already called exists() on in-progress directory
+    Mockito.verify(mockFs, times(1)).exists(Mockito.eq(startRoundShardDir));
+    Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir));
+    Mockito.verify(mockFs, times(2)).listStatus(Mockito.any(Path.class));
+
+    // Verify empty list is returned
+    assertTrue("Should return empty list for empty rounds", result.isEmpty());
+  }
+
+  @Test
+  public void testGetNewFilesWithNonExistentRounds() throws IOException {
+    // Initialize tracker
+    tracker.init();
+
+    ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+    long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 
1000L;
+
+    // Create start round
+    long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+    long startRoundEndTime = startRoundStartTime + roundDurationMs;
+    ReplicationRound startRound = new ReplicationRound(startRoundStartTime, 
startRoundEndTime);
+
+    // Create end round
+    long endRoundStartTime = startRoundEndTime;
+    long endRoundEndTime = endRoundStartTime + roundDurationMs;
+    ReplicationRound endRound = new ReplicationRound(endRoundStartTime, 
endRoundEndTime);
+
+    // Get shard directories
+    Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+    Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+    // Assert that shard directories do not exist
+    assertFalse("Start round shard directory should not exist", 
localFs.exists(startRoundShardDir));
+    assertFalse("End round shard directory should not exist", 
localFs.exists(endRoundShardDir));
+
+    // Call getNewFiles with non-existent rounds
+    List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+    // Verify file system operations
+    // Should call exists() for each round (start and end)
+    // Note: init() already called exists() on in-progress directory
+    Mockito.verify(mockFs, times(1)).exists(Mockito.eq(startRoundShardDir));
+    Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir));
+    // listStatus() should not be called when directories don't exist
+    Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
+
+    // Verify empty list is returned
+    assertTrue("Should return empty list for non-existent rounds", 
result.isEmpty());
+  }
+
   private int countDirectories(FileSystem fs, Path path) throws IOException {
     if (!fs.exists(path)) {
       return 0;

Reply via email to