This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 30aaabb457 Phoenix-7672 Adding API to get new files within range of
rounds (#2355)
30aaabb457 is described below
commit 30aaabb457e7e606271e8a35e15f4ba0bcecc26a
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Tue Jan 27 09:28:51 2026 +0530
Phoenix-7672 Adding API to get new files within range of rounds (#2355)
---
.../phoenix/replication/ReplicationLogTracker.java | 28 ++
.../reader/ReplicationLogDiscoveryReplay.java | 43 +++-
.../ReplicationLogDiscoveryReplayTestIT.java | 34 ++-
.../replication/ReplicationLogTrackerTest.java | 286 +++++++++++++++++++++
4 files changed, 373 insertions(+), 18 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
index 92031c1349..03832a4d85 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
@@ -151,6 +151,34 @@ public class ReplicationLogTracker {
return filesInRound;
}
+ /**
+ * Retrieves new replication log files that belong to replication rounds
from startRound to
+ * endRound (inclusive). Iterates through all rounds in the range and
collects valid log files
+ * from each round's shard directory.
+ * @param startRound - The starting replication round (inclusive)
+ * @param endRound - The ending replication round (inclusive)
+ * @return List of valid log file paths from startRound to endRound, empty
list if startRound >
+ * endRound
+ * @throws IOException if there's an error accessing the file system
+ */
+ public List<Path> getNewFiles(ReplicationRound startRound, ReplicationRound
endRound)
+ throws IOException {
+ List<Path> files = new ArrayList<>();
+ // Early return if startRound is after endRound (invalid range)
+ if (startRound.getStartTime() > endRound.getStartTime()) {
+ return files;
+ }
+ // Iterate through all rounds from startRound to endRound (exclusive of
endRound)
+ ReplicationRound firstRound = startRound;
+ while (!firstRound.equals(endRound)) {
+ files.addAll(getNewFilesForRound(firstRound));
+ firstRound = replicationShardDirectoryManager.getNextRound(firstRound);
+ }
+ // Add the files for the endRound (inclusive)
+ files.addAll(getNewFilesForRound(endRound));
+ return files;
+ }
+
/**
* Retrieves all valid log files currently in the in-progress directory.
* @return List of valid log file paths in the in-progress directory, empty
list if directory
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index 36daca8b97..b6ce96e62c 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -31,6 +31,7 @@ import org.apache.phoenix.jdbc.HAGroupStoreRecord;
import org.apache.phoenix.replication.ReplicationLogDiscovery;
import org.apache.phoenix.replication.ReplicationLogTracker;
import org.apache.phoenix.replication.ReplicationRound;
+import org.apache.phoenix.replication.ReplicationShardDirectoryManager;
import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery;
import
org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplayImpl;
import org.slf4j.Logger;
@@ -455,15 +456,47 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
* Determines whether failover should be triggered based on completion
criteria. Failover is safe
* to trigger when all of the following conditions are met: 1. A failover
has been requested
* (failoverPending is true) 2. No files are currently in the in-progress
directory 3. No new
- * files exist for ongoing round These conditions ensure all replication
logs have been processed
- * before transitioning the cluster from STANDBY to ACTIVE state.
+ * files exist from the next round to process up to the current timestamp
round. The third
+ * condition checks for new files in the range from nextRoundToProcess
(derived from
+ * getLastRoundProcessed()) to currentTimestampRound (derived from current
time). This ensures all
+ * replication logs up to the current time have been processed before
transitioning the cluster
+ * from STANDBY to ACTIVE state.
* @return true if all conditions are met and failover should be triggered,
false otherwise
* @throws IOException if there's an error checking file status
*/
protected boolean shouldTriggerFailover() throws IOException {
- return failoverPending.get() &&
replicationLogTracker.getInProgressFiles().isEmpty()
- && replicationLogTracker.getNewFilesForRound(replicationLogTracker
-
.getReplicationShardDirectoryManager().getNextRound(getLastRoundProcessed())).isEmpty();
+ LOG.debug("Checking if failover should be triggered. failoverPending={}",
failoverPending);
+ // Check if failover has been requested
+ if (!failoverPending.get()) {
+ LOG.debug("Failover not triggered. failoverPending is false.");
+ return false;
+ }
+ // Check if in-progress directory is empty
+ boolean isInProgressDirectoryEmpty =
replicationLogTracker.getInProgressFiles().isEmpty();
+ if (!isInProgressDirectoryEmpty) {
+ LOG.debug("Failover not triggered. In progress directory is not empty.");
+ return false;
+ }
+ // Check if there are any new files from next round to current timestamp
round
+ ReplicationShardDirectoryManager replicationShardDirectoryManager =
+ replicationLogTracker.getReplicationShardDirectoryManager();
+ ReplicationRound nextRoundToProcess =
+ replicationShardDirectoryManager.getNextRound(getLastRoundProcessed());
+ ReplicationRound currentTimestampRound = replicationShardDirectoryManager
+ .getReplicationRoundFromStartTime(EnvironmentEdgeManager.currentTime());
+ LOG.debug("Checking the new files from next round {} to current timestamp
round {}.",
+ nextRoundToProcess, currentTimestampRound);
+ boolean isInDirectoryEmpty =
+ replicationLogTracker.getNewFiles(nextRoundToProcess,
currentTimestampRound).isEmpty();
+
+ if (!isInDirectoryEmpty) {
+ LOG.debug(
+ "Failover not triggered. New files exist from next round to current "
+ "timestamp round.");
+ return false;
+ }
+
+ LOG.info("Failover can be triggered.");
+ return true;
}
protected void triggerFailover() {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
index 7668838026..27d62e71ea 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
@@ -1816,7 +1816,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
/**
* Tests the shouldTriggerFailover method with various combinations of
failoverPending,
- * in-progress files, and new files for next round.
+ * in-progress files, and new files from next round to current timestamp
round.
*/
@Test
public void testShouldTriggerFailover() throws IOException {
@@ -1838,13 +1838,16 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
try {
// Create test rounds
ReplicationRound testRound = new ReplicationRound(1704153600000L,
1704153660000L);
- ReplicationRound nextRound =
- tracker.getReplicationShardDirectoryManager().getNextRound(testRound);
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ ReplicationRound nextRoundToProcess =
shardManager.getNextRound(testRound);
+ ReplicationRound currentTimestampRound =
+ shardManager.getReplicationRoundFromStartTime(currentTime);
// Test Case 1: All conditions true - should return true
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+ when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
+ .thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
@@ -1858,7 +1861,8 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Test Case 2: failoverPending is false - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+ when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
+ .thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
@@ -1873,7 +1877,8 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
{
when(tracker.getInProgressFiles())
.thenReturn(Collections.singletonList(new Path("test.plog")));
-
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+ when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
+ .thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
@@ -1884,10 +1889,11 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
discovery.shouldTriggerFailover());
}
- // Test Case 4: new files exist for next round - should return false
+ // Test Case 4: new files exist from next round to current timestamp
round - should return
+ // false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
- when(tracker.getNewFilesForRound(nextRound))
+ when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
.thenReturn(Collections.singletonList(new Path("test.plog")));
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
@@ -1895,7 +1901,8 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(true);
- assertFalse("Should not trigger failover when new files exist for next
round",
+ assertFalse(
+ "Should not trigger failover when new files exist from next round to
current timestamp round",
discovery.shouldTriggerFailover());
}
@@ -1903,7 +1910,8 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
{
when(tracker.getInProgressFiles())
.thenReturn(Collections.singletonList(new Path("test.plog")));
-
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+ when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
+ .thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
@@ -1918,7 +1926,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Test Case 6: failoverPending false AND new files exist - should
return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
- when(tracker.getNewFilesForRound(nextRound))
+ when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
.thenReturn(Collections.singletonList(new Path("test.plog")));
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
@@ -1934,7 +1942,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
{
when(tracker.getInProgressFiles())
.thenReturn(Collections.singletonList(new Path("test1.plog")));
- when(tracker.getNewFilesForRound(nextRound))
+ when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
.thenReturn(Collections.singletonList(new Path("test2.plog")));
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
@@ -1950,7 +1958,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
{
when(tracker.getInProgressFiles())
.thenReturn(Collections.singletonList(new Path("test.plog")));
- when(tracker.getNewFilesForRound(nextRound))
+ when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound))
.thenReturn(Collections.singletonList(new Path("test2.plog")));
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
index 2a2b6055ae..2037b25c0e 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
@@ -1362,6 +1362,292 @@ public class ReplicationLogTrackerTest {
resultFilenames.contains(invalidFile3.getName()));
}
+ @Test
+ public void testGetNewFilesWithStartAndEndRound() throws IOException {
+ // Initialize tracker
+ tracker.init();
+
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() *
1000L;
+
+ // Create start round (60 seconds duration)
+ long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+ long startRoundEndTime = startRoundStartTime + roundDurationMs;
+ ReplicationRound startRound = new ReplicationRound(startRoundStartTime,
startRoundEndTime);
+
+ // Create middle round
+ long middleRoundStartTime = startRoundEndTime;
+ long middleRoundEndTime = middleRoundStartTime + roundDurationMs;
+ ReplicationRound middleRound = new ReplicationRound(middleRoundStartTime,
middleRoundEndTime);
+
+ // Create end round
+ long endRoundStartTime = middleRoundEndTime;
+ long endRoundEndTime = endRoundStartTime + roundDurationMs;
+ ReplicationRound endRound = new ReplicationRound(endRoundStartTime,
endRoundEndTime);
+
+ // Get shard directories for each round
+ Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+ Path middleRoundShardDir = shardManager.getShardDirectory(middleRound);
+ Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+ // Create shard directories
+ localFs.mkdirs(startRoundShardDir);
+ localFs.mkdirs(middleRoundShardDir);
+ localFs.mkdirs(endRoundShardDir);
+
+ // Create files in start round
+ Path startRoundFile1 = new Path(startRoundShardDir, startRoundStartTime +
"_rs1.plog");
+ Path startRoundFile2 = new Path(startRoundShardDir, startRoundStartTime +
30000 + "_rs2.plog");
+
+ // Create files in middle round
+ Path middleRoundFile1 = new Path(middleRoundShardDir, middleRoundStartTime
+ "_rs3.plog");
+ Path middleRoundFile2 =
+ new Path(middleRoundShardDir, middleRoundStartTime + 30000 +
"_rs4.plog");
+
+ // Create files in end round
+ Path endRoundFile1 = new Path(endRoundShardDir, endRoundStartTime +
"_rs5.plog");
+ Path endRoundFile2 = new Path(endRoundShardDir, endRoundStartTime + 30000
+ "_rs6.plog");
+
+ // Create files outside the range (before start round)
+ ReplicationRound beforeStartRound =
shardManager.getPreviousRound(startRound);
+ Path beforeStartRoundShardDir =
shardManager.getShardDirectory(beforeStartRound);
+ localFs.mkdirs(beforeStartRoundShardDir);
+ Path beforeStartRoundFile =
+ new Path(beforeStartRoundShardDir, beforeStartRound.getStartTime() +
"_rs0.plog");
+
+ // Create files outside the range (after end round)
+ ReplicationRound afterEndRound = shardManager.getNextRound(endRound);
+ Path afterEndRoundShardDir = shardManager.getShardDirectory(afterEndRound);
+ localFs.mkdirs(afterEndRoundShardDir);
+ Path afterEndRoundFile =
+ new Path(afterEndRoundShardDir, afterEndRound.getStartTime() +
"_rs7.plog");
+
+ // Create all files
+ localFs.create(startRoundFile1, true).close();
+ localFs.create(startRoundFile2, true).close();
+ localFs.create(middleRoundFile1, true).close();
+ localFs.create(middleRoundFile2, true).close();
+ localFs.create(endRoundFile1, true).close();
+ localFs.create(endRoundFile2, true).close();
+ localFs.create(beforeStartRoundFile, true).close();
+ localFs.create(afterEndRoundFile, true).close();
+
+ // Call getNewFiles with startRound and endRound
+ List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+ // Verify file system operations
+ // Should call getNewFilesForRound for each round (start, middle, end)
+ // Each call to getNewFilesForRound calls exists() and listStatus() on
shard directories
+ // Note: init() already called exists() on in-progress directory
+ Mockito.verify(mockFs, times(1)).exists(Mockito.eq(startRoundShardDir));
+ Mockito.verify(mockFs, times(1)).exists(Mockito.eq(middleRoundShardDir));
+ Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir));
+ Mockito.verify(mockFs, times(3)).listStatus(Mockito.any(Path.class));
+
+ // Prepare expected set of file paths (should include files from start,
middle, and end rounds)
+ Set<String> expectedPaths = new HashSet<>();
+ expectedPaths.add(startRoundFile1.toString());
+ expectedPaths.add(startRoundFile2.toString());
+ expectedPaths.add(middleRoundFile1.toString());
+ expectedPaths.add(middleRoundFile2.toString());
+ expectedPaths.add(endRoundFile1.toString());
+ expectedPaths.add(endRoundFile2.toString());
+
+ // Create actual set of paths
+ Set<String> actualPaths =
+ result.stream().map(path ->
path.toUri().getPath()).collect(Collectors.toSet());
+
+ // Verify all files from start to end rounds are returned
+ assertEquals("Should return exactly 6 files from start to end rounds",
expectedPaths.size(),
+ actualPaths.size());
+ assertEquals("File paths do not match", expectedPaths, actualPaths);
+
+ // Verify files outside the range are not included
+ assertFalse("Should not contain file from before start round",
+ actualPaths.contains(beforeStartRoundFile.toString()));
+ assertFalse("Should not contain file from after end round",
+ actualPaths.contains(afterEndRoundFile.toString()));
+ }
+
+ @Test
+ public void testGetNewFilesWithSameStartAndEndRound() throws IOException {
+ // Initialize tracker
+ tracker.init();
+
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() *
1000L;
+
+ // Create a single round
+ long roundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+ long roundEndTime = roundStartTime + roundDurationMs;
+ ReplicationRound round = new ReplicationRound(roundStartTime,
roundEndTime);
+
+ // Get shard directory for this round
+ Path roundShardDir = shardManager.getShardDirectory(round);
+ localFs.mkdirs(roundShardDir);
+
+ // Create files in the round
+ Path file1 = new Path(roundShardDir, roundStartTime + "_rs1.plog");
+ Path file2 = new Path(roundShardDir, roundStartTime + 30000 + "_rs2.plog");
+ Path file3 = new Path(roundShardDir, roundStartTime + 50000 + "_rs3.plog");
+
+ // Create files
+ localFs.create(file1, true).close();
+ localFs.create(file2, true).close();
+ localFs.create(file3, true).close();
+
+ // Call getNewFiles with same start and end round
+ List<Path> result = tracker.getNewFiles(round, round);
+
+ // Verify file system operations
+ // Should call getNewFilesForRound once for the round
+ // Note: init() already called exists() on in-progress directory
+ Mockito.verify(mockFs, times(1)).exists(Mockito.eq(roundShardDir));
+ Mockito.verify(mockFs, times(1)).listStatus(Mockito.any(Path.class));
+
+ // Prepare expected set of file paths
+ Set<String> expectedPaths = new HashSet<>();
+ expectedPaths.add(file1.toString());
+ expectedPaths.add(file2.toString());
+ expectedPaths.add(file3.toString());
+
+ // Create actual set of paths
+ Set<String> actualPaths =
+ result.stream().map(path ->
path.toUri().getPath()).collect(Collectors.toSet());
+
+ // Verify all files from the round are returned
+ assertEquals("Should return exactly 3 files from the round",
expectedPaths.size(),
+ actualPaths.size());
+ assertEquals("File paths do not match", expectedPaths, actualPaths);
+ }
+
+ @Test
+ public void testGetNewFilesWithInvalidRange() throws IOException {
+ // Initialize tracker
+ tracker.init();
+
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() *
1000L;
+
+ // Create end round (earlier time)
+ long endRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+ long endRoundEndTime = endRoundStartTime + roundDurationMs;
+ ReplicationRound endRound = new ReplicationRound(endRoundStartTime,
endRoundEndTime);
+
+ // Create start round (later time) - invalid: start > end
+ long startRoundStartTime = endRoundEndTime;
+ long startRoundEndTime = startRoundStartTime + roundDurationMs;
+ ReplicationRound startRound = new ReplicationRound(startRoundStartTime,
startRoundEndTime);
+
+ // Get shard directories
+ Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+ Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+ // Create shard directories
+ localFs.mkdirs(startRoundShardDir);
+ localFs.mkdirs(endRoundShardDir);
+
+ // Create files in both rounds
+ Path startRoundFile = new Path(startRoundShardDir, startRoundStartTime +
"_rs1.plog");
+ Path endRoundFile = new Path(endRoundShardDir, endRoundStartTime +
"_rs2.plog");
+
+ localFs.create(startRoundFile, true).close();
+ localFs.create(endRoundFile, true).close();
+
+ // Call getNewFiles with invalid range (startRound.getStartTime() >
endRound.getStartTime())
+ List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+ // Verify empty list is returned when startRound.getStartTime() >
endRound.getStartTime()
+ assertTrue("Should return empty list for invalid range", result.isEmpty());
+
+ // Verify no file system operations were performed on shard directories
(early return)
+ // Note: init() already called exists() and mkdirs() on in-progress
directory
+ Mockito.verify(mockFs, times(0)).exists(Mockito.eq(startRoundShardDir));
+ Mockito.verify(mockFs, times(0)).exists(Mockito.eq(endRoundShardDir));
+ Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
+ }
+
+ @Test
+ public void testGetNewFilesWithEmptyRounds() throws IOException {
+ // Initialize tracker
+ tracker.init();
+
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() *
1000L;
+
+ // Create start round
+ long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+ long startRoundEndTime = startRoundStartTime + roundDurationMs;
+ ReplicationRound startRound = new ReplicationRound(startRoundStartTime,
startRoundEndTime);
+
+ // Create end round
+ long endRoundStartTime = startRoundEndTime;
+ long endRoundEndTime = endRoundStartTime + roundDurationMs;
+ ReplicationRound endRound = new ReplicationRound(endRoundStartTime,
endRoundEndTime);
+
+ // Get shard directories
+ Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+ Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+ // Create shard directories but leave them empty
+ localFs.mkdirs(startRoundShardDir);
+ localFs.mkdirs(endRoundShardDir);
+
+ // Call getNewFiles with empty rounds
+ List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+ // Verify file system operations
+ // Should call getNewFilesForRound for each round (start and end)
+ // Note: init() already called exists() on in-progress directory
+ Mockito.verify(mockFs, times(1)).exists(Mockito.eq(startRoundShardDir));
+ Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir));
+ Mockito.verify(mockFs, times(2)).listStatus(Mockito.any(Path.class));
+
+ // Verify empty list is returned
+ assertTrue("Should return empty list for empty rounds", result.isEmpty());
+ }
+
+ @Test
+ public void testGetNewFilesWithNonExistentRounds() throws IOException {
+ // Initialize tracker
+ tracker.init();
+
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() *
1000L;
+
+ // Create start round
+ long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+ long startRoundEndTime = startRoundStartTime + roundDurationMs;
+ ReplicationRound startRound = new ReplicationRound(startRoundStartTime,
startRoundEndTime);
+
+ // Create end round
+ long endRoundStartTime = startRoundEndTime;
+ long endRoundEndTime = endRoundStartTime + roundDurationMs;
+ ReplicationRound endRound = new ReplicationRound(endRoundStartTime,
endRoundEndTime);
+
+ // Get shard directories
+ Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+ Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+ // Assert that shard directories do not exist
+ assertFalse("Start round shard directory should not exist",
localFs.exists(startRoundShardDir));
+ assertFalse("End round shard directory should not exist",
localFs.exists(endRoundShardDir));
+
+ // Call getNewFiles with non-existent rounds
+ List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+ // Verify file system operations
+ // Should call exists() for each round (start and end)
+ // Note: init() already called exists() on in-progress directory
+ Mockito.verify(mockFs, times(1)).exists(Mockito.eq(startRoundShardDir));
+ Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir));
+ // listStatus() should not be called when directories don't exist
+ Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
+
+ // Verify empty list is returned
+ assertTrue("Should return empty list for non-existent rounds",
result.isEmpty());
+ }
+
private int countDirectories(FileSystem fs, Path path) throws IOException {
if (!fs.exists(path)) {
return 0;