This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new bc1e05878d PHOENIX-7785 Avoid infinite loops & duplicate file 
processing for in progress directory (#2395)
bc1e05878d is described below

commit bc1e05878db6ec2fafc33dbc8718474a184fe6ab
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Sun Apr 12 01:56:23 2026 +0530

    PHOENIX-7785 Avoid infinite loops & duplicate file processing for in 
progress directory (#2395)
---
 .../replication/ReplicationLogDiscovery.java       |  74 +-
 .../phoenix/replication/ReplicationLogTracker.java | 130 +--
 .../replication/ReplicationLogDiscoveryTest.java   | 940 +++++++++++----------
 .../replication/ReplicationLogTrackerTest.java     | 352 ++++----
 4 files changed, 857 insertions(+), 639 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
index 56f40866a7..175de94f87 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
@@ -18,7 +18,9 @@
 package org.apache.phoenix.replication;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -83,6 +85,25 @@ public abstract class ReplicationLogDiscovery {
    */
   protected static final double DEFAULT_WAITING_BUFFER_PERCENTAGE = 15.0;
 
+  /**
+   * Configuration key for maximum number of retries per in-progress file 
within a single processing
+   * round. Files that fail this many times are skipped for the rest of the 
round.
+   */
+  public static final String REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY =
+    "phoenix.replication.in.progress.file.max.retries";
+
+  public static final int DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES = 1;
+
+  /**
+   * Configuration key for the minimum age (in seconds) of an in-progress 
file's rename timestamp
+   * before it becomes eligible for processing. This prevents a file recently 
marked in-progress by
+   * one region server from being immediately picked up by another.
+   */
+  public static final String REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY =
+    "phoenix.replication.in.progress.file.min.age.seconds";
+
+  public static final int DEFAULT_IN_PROGRESS_FILE_MIN_AGE_SECONDS = 60;
+
   protected final Configuration conf;
   protected final String haGroupName;
   protected final ReplicationLogTracker replicationLogTracker;
@@ -283,8 +304,9 @@ public abstract class ReplicationLogDiscovery {
   }
 
   /**
-   * Processes all files (older than 1 round time) in the in-progress 
directory. Continuously
-   * processes files until no in-progress files remain.
+   * Processes all files in the in-progress directory whose rename timestamp 
is older than the
+   * configured minimum age. Continuously processes files until no eligible 
in-progress files
+   * remain.
    * @throws IOException if there's an error during file processing
    */
   protected void processInProgressDirectory() throws IOException {
@@ -293,16 +315,31 @@ public abstract class ReplicationLogDiscovery {
     // Increase the count for number of times in progress directory is 
processed
     getMetrics().incrementNumInProgressDirectoryProcessed();
     long startTime = EnvironmentEdgeManager.currentTime();
-    long oldestTimestampToProcess =
-      
replicationLogTracker.getReplicationShardDirectoryManager().getNearestRoundStartTimestamp(
-        EnvironmentEdgeManager.currentTime()) - getReplayIntervalSeconds() * 
1000L;
-    List<Path> files = 
replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
-    LOG.info("Number of {} files with oldestTimestampToProcess {} is {} for 
haGroup: {}",
-      replicationLogTracker.getInProgressLogSubDirectoryName(), 
oldestTimestampToProcess,
+    long renameTimestampThreshold =
+      EnvironmentEdgeManager.currentTime() - getInProgressFileMinAgeSeconds() 
* 1000L;
+    int maxRetries = getInProgressFileMaxRetries();
+    Map<String, Integer> failureCount = new HashMap<>();
+    List<Path> files = 
replicationLogTracker.getOlderInProgressFiles(renameTimestampThreshold);
+    LOG.info("Number of {} files with renameTimestampThreshold {} is {} for 
haGroup: {}",
+      replicationLogTracker.getInProgressLogSubDirectoryName(), 
renameTimestampThreshold,
       files.size(), haGroupName);
     while (!files.isEmpty()) {
-      processOneRandomFile(files);
-      files = 
replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
+      Optional<Path> failedFile = processOneRandomFile(files);
+      if (failedFile.isPresent()) {
+        String prefix = replicationLogTracker.getFilePrefix(failedFile.get());
+        int count = failureCount.merge(prefix, 1, Integer::sum);
+        if (count >= maxRetries) {
+          LOG.warn(
+            "File {} (prefix: {}) has failed {} time(s), reached max retries 
({}). "
+              + "Skipping for the rest of this round for haGroup: {}",
+            failedFile.get(), prefix, count, maxRetries, haGroupName);
+        }
+      }
+      renameTimestampThreshold =
+        EnvironmentEdgeManager.currentTime() - 
getInProgressFileMinAgeSeconds() * 1000L;
+      files = 
replicationLogTracker.getOlderInProgressFiles(renameTimestampThreshold);
+      files.removeIf(
+        f -> failureCount.getOrDefault(replicationLogTracker.getFilePrefix(f), 
0) >= maxRetries);
     }
     long duration = EnvironmentEdgeManager.currentTime() - startTime;
     LOG.info("Finished in-progress files processing in {}ms for haGroup: {}", 
duration,
@@ -314,8 +351,9 @@ public abstract class ReplicationLogDiscovery {
    * Processes a single random file from the provided list. Marks the file as 
in-progress, processes
    * it, and marks it as completed or failed.
    * @param files - List of files from which to select and process one randomly
+   * @return the original path of the file that failed, or empty if processing 
succeeded
    */
-  private void processOneRandomFile(final List<Path> files) throws IOException 
{
+  private Optional<Path> processOneRandomFile(final List<Path> files) throws 
IOException {
     // Pick a random file and process it
     Path file = files.get(ThreadLocalRandom.current().nextInt(files.size()));
     Optional<Path> optionalInProgressFilePath = Optional.empty();
@@ -329,9 +367,9 @@ public abstract class ReplicationLogDiscovery {
       LOG.error("Failed to process the file {}", file, exception);
       optionalInProgressFilePath.ifPresent(replicationLogTracker::markFailed);
       // Not throwing this exception because next time another random file 
will be retried.
-      // If it's persistent failure for in_progress directory,
-      // cluster state should to be DEGRADED_STANDBY_FOR_READER.
+      return Optional.of(file);
     }
+    return Optional.empty();
   }
 
   /**
@@ -469,6 +507,16 @@ public abstract class ReplicationLogDiscovery {
     return DEFAULT_WAITING_BUFFER_PERCENTAGE;
   }
 
+  public int getInProgressFileMaxRetries() {
+    return conf.getInt(REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY,
+      DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES);
+  }
+
+  public int getInProgressFileMinAgeSeconds() {
+    return conf.getInt(REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
+      DEFAULT_IN_PROGRESS_FILE_MIN_AGE_SECONDS);
+  }
+
   public ReplicationLogTracker getReplicationLogFileTracker() {
     return this.replicationLogTracker;
   }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
index 03832a4d85..b6a7a105e1 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.phoenix.replication.metrics.MetricsReplicationLogTracker;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -203,15 +203,15 @@ public class ReplicationLogTracker {
   }
 
   /**
-   * Retrieves all valid log files in the in-progress directory that are older 
than the specified
-   * timestamp.
-   * @param timestampThreshold - The timestamp threshold in milliseconds. 
Files with timestamps less
-   *                           than this value will be returned.
-   * @return List of valid log file paths in the in-progress directory that 
are older than the
+   * Retrieves all valid log files in the in-progress directory whose rename 
timestamp is older than
+   * the specified threshold. Files without a rename timestamp are skipped.
+   * @param renameTimestampThreshold - The timestamp threshold in 
milliseconds. Files with rename
+   *                                 timestamps less than this value will be 
returned.
+   * @return List of valid log file paths in the in-progress directory that 
were renamed before the
    *         threshold, empty list if directory doesn't exist or no files match
    * @throws IOException if there's an error accessing the file system
    */
-  public List<Path> getOlderInProgressFiles(long timestampThreshold) throws 
IOException {
+  public List<Path> getOlderInProgressFiles(long renameTimestampThreshold) 
throws IOException {
     if (!fileSystem.exists(getInProgressDirPath())) {
       return Collections.emptyList();
     }
@@ -221,20 +221,19 @@ public class ReplicationLogTracker {
 
     for (FileStatus status : fileStatuses) {
       if (status.isFile() && isValidLogFile(status.getPath())) {
-        try {
-          long fileTimestamp = getFileTimestamp(status.getPath());
-          if (fileTimestamp < timestampThreshold) {
-            olderInProgressFiles.add(status.getPath());
-          }
-        } catch (NumberFormatException e) {
-          LOG.warn("Failed to extract timestamp from file {}, skipping",
-            status.getPath().getName());
+        Optional<Long> renameTimestamp = getRenameTimestamp(status.getPath());
+        if (!renameTimestamp.isPresent()) {
+          LOG.warn("File {} has no rename timestamp, skipping", 
status.getPath().getName());
+          continue;
+        }
+        if (renameTimestamp.get() < renameTimestampThreshold) {
+          olderInProgressFiles.add(status.getPath());
         }
       }
     }
 
-    LOG.debug("Found {} in-progress files older than timestamp {}", 
olderInProgressFiles.size(),
-      timestampThreshold);
+    LOG.debug("Found {} in-progress files renamed before timestamp {}", 
olderInProgressFiles.size(),
+      renameTimestampThreshold);
     return olderInProgressFiles;
   }
 
@@ -267,7 +266,7 @@ public class ReplicationLogTracker {
    * @return true if file was successfully deleted, false otherwise
    */
   protected boolean markCompleted(final Path file) {
-    long startTime = EnvironmentEdgeManager.currentTimeMillis();
+    long startTime = EnvironmentEdgeManager.currentTime();
     // Increment the metrics count
     getMetrics().incrementMarkFileCompletedRequestCount();
 
@@ -282,7 +281,7 @@ public class ReplicationLogTracker {
       try {
         if (fileSystem.delete(fileToDelete, false)) {
           LOG.info("Successfully deleted completed file: {}", fileToDelete);
-          long endTime = EnvironmentEdgeManager.currentTimeMillis();
+          long endTime = EnvironmentEdgeManager.currentTime();
           getMetrics().updateMarkFileCompletedTime(endTime - startTime);
           return true;
         } else {
@@ -323,13 +322,13 @@ public class ReplicationLogTracker {
           } else if (matchingFiles.size() > 1) {
             LOG.warn("Multiple matching in-progress files found for prefix {}: 
{}", filePrefix,
               matchingFiles.size());
-            long endTime = EnvironmentEdgeManager.currentTimeMillis();
+            long endTime = EnvironmentEdgeManager.currentTime();
             getMetrics().updateMarkFileCompletedTime(endTime - startTime);
             return false;
           } else {
             LOG.warn("No matching in-progress file found for prefix: {}. File 
must " + "have "
               + "been deleted by some other process.", filePrefix);
-            long endTime = EnvironmentEdgeManager.currentTimeMillis();
+            long endTime = EnvironmentEdgeManager.currentTime();
             getMetrics().updateMarkFileCompletedTime(endTime - startTime);
             return true;
           }
@@ -341,7 +340,7 @@ public class ReplicationLogTracker {
       }
     }
 
-    long endTime = EnvironmentEdgeManager.currentTimeMillis();
+    long endTime = EnvironmentEdgeManager.currentTime();
     getMetrics().updateMarkFileCompletedTime(endTime - startTime);
 
     LOG.error("Failed to delete file after {} attempts: {}", maxRetries + 1, 
fileToDelete);
@@ -355,7 +354,7 @@ public class ReplicationLogTracker {
    * @return Optional value of renamed path if file rename was successful, 
else Optional.empty()
    */
   protected Optional<Path> markInProgress(final Path file) {
-    long startTime = EnvironmentEdgeManager.currentTimeMillis();
+    long startTime = EnvironmentEdgeManager.currentTime();
     try {
 
       final String fileName = file.getName();
@@ -364,26 +363,20 @@ public class ReplicationLogTracker {
 
       // Check if file is already in in-progress directory
       if 
(file.getParent().toUri().getPath().equals(getInProgressDirPath().toString())) {
-        // File is already in in-progress directory, replace UUID with a new 
one
+        // File is already in in-progress directory, replace UUID and rename 
timestamp
         // keep the directory same as in progress
+        // Format: <ts>_<server>_<UUID>_<renameTs>.plog → extract prefix 
(first 2 parts)
         String[] parts = fileName.split("_");
-        // Remove the last part (UUID) and add new UUID
-        StringBuilder newNameBuilder = new StringBuilder();
-        for (int i = 0; i < parts.length - 1; i++) {
-          if (i > 0) {
-            newNameBuilder.append("_");
-          }
-          newNameBuilder.append(parts[i]);
-        }
-        String extension = fileName.substring(fileName.lastIndexOf("."));
-        newNameBuilder.append("_").append(UUID.randomUUID()).append(extension);
-        newFileName = newNameBuilder.toString();
+        String prefix = parts[0] + "_" + parts[1];
+        newFileName =
+          prefix + "_" + UUID.randomUUID() + "_" + 
EnvironmentEdgeManager.currentTime() + ".plog";
         targetDirectory = file.getParent();
       } else {
-        // File is not in in-progress directory, add UUID and move to 
IN_PROGRESS directory
+        // File is not in in-progress directory, add UUID + rename timestamp 
and move to
+        // IN_PROGRESS directory
         String baseName = fileName.substring(0, fileName.lastIndexOf("."));
-        String extension = fileName.substring(fileName.lastIndexOf("."));
-        newFileName = baseName + "_" + UUID.randomUUID() + extension;
+        newFileName =
+          baseName + "_" + UUID.randomUUID() + "_" + 
EnvironmentEdgeManager.currentTime() + ".plog";
         targetDirectory = getInProgressDirPath();
       }
 
@@ -401,7 +394,7 @@ public class ReplicationLogTracker {
     } finally {
       // Update the metrics
       getMetrics().incrementMarkFileInProgressRequestCount();
-      long endTime = EnvironmentEdgeManager.currentTimeMillis();
+      long endTime = EnvironmentEdgeManager.currentTime();
       getMetrics().updateMarkFileInProgressTime(endTime - startTime);
     }
   }
@@ -427,42 +420,55 @@ public class ReplicationLogTracker {
   }
 
   /**
-   * Extracts the UUID from a log file name. Assumes UUID is the last part of 
the filename before
-   * the extension.
+   * Extracts the UUID from an in-progress file name. Format: 
<ts>_<server>_<UUID>_<renameTs>.plog →
+   * UUID is the third underscore-separated part.
    * @param file - The file path to extract UUID from.
-   * @return Optional of UUID if file was in progress, else Optional.empty()
+   * @return Optional of UUID if file is in-progress format, else 
Optional.empty()
    */
   protected Optional<String> getFileUUID(Path file) throws 
NumberFormatException {
     String[] parts = file.getName().split("_");
-    if (parts.length < 3) {
+    if (parts.length < 4) {
       return Optional.empty();
     }
-    return Optional.of(parts[parts.length - 1].split("\\.")[0]);
+    return Optional.of(parts[2]);
   }
 
   /**
-   * Extracts everything except the UUID (last part) from a file path. For 
example, from
-   * "1704153600000_rs1_12345678-1234-1234-1234-123456789abc.plog" This method 
will return
-   * "1704153600000_rs1"
-   * @param file - The file path to extract prefix from.
+   * Extracts the rename timestamp from an in-progress file name. Format:
+   * <ts>_<server>_<UUID>_<renameTs>.plog → renameTs is the last 
underscore-separated part (before
+   * extension).
+   * @param file - The file path to extract rename timestamp from.
+   * @return Optional of rename timestamp, or empty if not present or invalid
    */
-  protected String getFilePrefix(Path file) {
-    String fileName = file.getName();
-    String[] parts = fileName.split("_");
-    if (parts.length < 3) {
-      return fileName.split("\\.")[0]; // Return full filename if no 
underscore found
+  public Optional<Long> getRenameTimestamp(Path file) {
+    String[] parts = file.getName().split("_");
+    if (parts.length < 4) {
+      return Optional.empty();
     }
-
-    // Return everything except the last part (UUID)
-    StringBuilder prefix = new StringBuilder();
-    for (int i = 0; i < parts.length - 1; i++) {
-      if (i > 0) {
-        prefix.append("_");
-      }
-      prefix.append(parts[i]);
+    try {
+      String lastPart = parts[parts.length - 1];
+      String withoutExtension = lastPart.substring(0, 
lastPart.lastIndexOf("."));
+      return Optional.of(Long.parseLong(withoutExtension));
+    } catch (NumberFormatException e) {
+      LOG.warn("Failed to parse rename timestamp from file {}", 
file.getName());
+      return Optional.empty();
     }
+  }
 
-    return prefix.toString();
+  /**
+   * Extracts the stable prefix from a file path. The prefix is the first two 
underscore-separated
+   * parts (<timestamp>_<servername>), which remain stable across renames. For 
example,
+   * "1704153600000_rs1_uuid_renameTs.plog" returns "1704153600000_rs1" and 
"1704153600000_rs1.plog"
+   * also returns "1704153600000_rs1".
+   * @param file - The file path to extract prefix from.
+   */
+  protected String getFilePrefix(Path file) {
+    String[] parts = file.getName().split("_");
+    if (parts.length < 2) {
+      return file.getName().split("\\.")[0];
+    }
+    String secondPart = parts[1].contains(".") ? parts[1].split("\\.")[0] : 
parts[1];
+    return parts[0] + "_" + secondPart;
   }
 
   /**
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
index 412c832305..caacf175ce 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
@@ -36,7 +36,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -90,6 +92,7 @@ public class ReplicationLogDiscoveryTest {
       replicationShardDirectoryManager));
     fileTracker.init();
 
+    
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
 0);
     discovery = Mockito.spy(new TestableReplicationLogDiscovery(fileTracker));
     Mockito.doReturn(metricsLogDiscovery).when(discovery).getMetrics();
   }
@@ -156,6 +159,24 @@ public class ReplicationLogDiscoveryTest {
     assertFalse("Discovery should not be running after stop", 
discovery.isRunning());
   }
 
+  @Test
+  public void testGetInProgressFileMaxRetries() {
+    // Default value
+    assertEquals("Default maxRetries should be 1",
+      ReplicationLogDiscovery.DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES,
+      discovery.getInProgressFileMaxRetries());
+
+    // Custom value from configuration
+    
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY,
 5);
+    assertEquals("maxRetries should reflect configured value", 5,
+      discovery.getInProgressFileMaxRetries());
+
+    // Reset to verify it reads from conf each time
+    
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY,
 3);
+    assertEquals("maxRetries should reflect updated configured value", 3,
+      discovery.getInProgressFileMaxRetries());
+  }
+
   /**
    * Tests processRound with in-progress directory processing enabled. 
Validates that both new files
    * and in-progress files are processed correctly.
@@ -215,14 +236,14 @@ public class ReplicationLogDiscoveryTest {
       // For in-progress files, they already have format: 
{timestamp}_{rs-n}_{UUID}.plog
       // Extract prefix before the UUID (everything before the last underscore)
       String fileName = file.getName();
-      String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
+      String prefix = extractPrefix(fileName);
       expectedProcessedFilePrefixes.add(prefix);
     }
     for (Path file : inProgressFiles0102) {
       // For in-progress files, they already have format: 
{timestamp}_{rs-n}_{UUID}.plog
       // Extract prefix before the UUID (everything before the last underscore)
       String fileName = file.getName();
-      String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
+      String prefix = extractPrefix(fileName);
       expectedProcessedFilePrefixes.add(prefix);
     }
 
@@ -231,12 +252,7 @@ public class ReplicationLogDiscoveryTest {
     Set<String> actualProcessedFilePrefixes = new HashSet<>();
     for (Path file : processedFiles) {
       String fileName = file.getName();
-      // Extract prefix before UUID and extension (everything before the last 
underscore before
-      // .plog)
-      // Remove the extension first
-      String withoutExtension = fileName.substring(0, 
fileName.lastIndexOf("."));
-      // Then get everything before the last underscore (which is the UUID)
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
+      String prefix = extractPrefix(fileName);
       actualProcessedFilePrefixes.add(prefix);
     }
 
@@ -256,16 +272,14 @@ public class ReplicationLogDiscoveryTest {
     }
     // For in-progress files
     for (Path expectedFile : inProgressFiles0004) {
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markInProgress(Mockito.argThat(path -> path
-        .getName().substring(0, 
path.getName().lastIndexOf("_")).equals(expectedPrefix)));
+      String expectedPrefix = extractPrefix(expectedFile.getName());
+      Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+        Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
     }
     for (Path expectedFile : inProgressFiles0102) {
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markInProgress(Mockito.argThat(path -> path
-        .getName().substring(0, 
path.getName().lastIndexOf("_")).equals(expectedPrefix)));
+      String expectedPrefix = extractPrefix(expectedFile.getName());
+      Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+        Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
     }
 
     // Verify that markCompleted was called 7 times (once for each 
successfully processed file)
@@ -276,33 +290,19 @@ public class ReplicationLogDiscoveryTest {
     for (Path expectedFile : newFilesForRound) {
       String expectedPrefix =
         expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("."));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
+      Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+        Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
     }
     // For in-progress files (they will have updated UUIDs, but same prefix)
     for (Path expectedFile : inProgressFiles0004) {
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
+      String expectedPrefix = extractPrefix(expectedFile.getName());
+      Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+        Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
     }
     for (Path expectedFile : inProgressFiles0102) {
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
+      String expectedPrefix = extractPrefix(expectedFile.getName());
+      Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+        Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
     }
 
     // Verify that shouldProcessInProgressDirectory was called once
@@ -388,8 +388,7 @@ public class ReplicationLogDiscoveryTest {
     Set<String> actualProcessedFilePrefixes = new HashSet<>();
     for (Path file : processedFiles) {
       String fileName = file.getName();
-      String withoutExtension = fileName.substring(0, 
fileName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
+      String prefix = extractPrefix(fileName);
       actualProcessedFilePrefixes.add(prefix);
     }
 
@@ -413,12 +412,8 @@ public class ReplicationLogDiscoveryTest {
     for (Path expectedFile : newFilesForRound) {
       String expectedPrefix =
         expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("."));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
+      Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+        Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
     }
 
     // Verify that shouldProcessInProgressDirectory was called once
@@ -448,16 +443,14 @@ public class ReplicationLogDiscoveryTest {
 
     // Validate that in-progress files were NOT processed
     for (Path unexpectedFile : inProgressFiles0004) {
-      String unexpectedPrefix =
-        unexpectedFile.getName().substring(0, 
unexpectedFile.getName().lastIndexOf("_"));
+      String unexpectedPrefix = extractPrefix(unexpectedFile.getName());
       assertFalse(
         "Should NOT have processed in-progress file from 00:00:04: " + 
unexpectedFile.getName(),
         actualProcessedFilePrefixes.contains(unexpectedPrefix));
     }
 
     for (Path unexpectedFile : inProgressFiles0102) {
-      String unexpectedPrefix =
-        unexpectedFile.getName().substring(0, 
unexpectedFile.getName().lastIndexOf("_"));
+      String unexpectedPrefix = extractPrefix(unexpectedFile.getName());
       assertFalse(
         "Should NOT have processed in-progress file from 00:01:02: " + 
unexpectedFile.getName(),
         actualProcessedFilePrefixes.contains(unexpectedPrefix));
@@ -560,8 +553,7 @@ public class ReplicationLogDiscoveryTest {
     Set<String> actualProcessedFilePrefixes = new HashSet<>();
     for (Path file : processedFiles) {
       String fileName = file.getName();
-      String withoutExtension = fileName.substring(0, 
fileName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
+      String prefix = extractPrefix(fileName);
       actualProcessedFilePrefixes.add(prefix);
     }
 
@@ -585,12 +577,8 @@ public class ReplicationLogDiscoveryTest {
     for (Path expectedFile : newFilesForRound) {
       String expectedPrefix =
         expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("."));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
+      Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+        Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
     }
 
     // Validate that files from other rounds were NOT processed (using prefix 
comparison)
@@ -618,16 +606,14 @@ public class ReplicationLogDiscoveryTest {
     // Validate that in-progress files were NOT processed 
(processNewFilesForRound only processes
     // new files)
     for (Path unexpectedFile : inProgressFiles0004) {
-      String unexpectedPrefix =
-        unexpectedFile.getName().substring(0, 
unexpectedFile.getName().lastIndexOf("_"));
+      String unexpectedPrefix = extractPrefix(unexpectedFile.getName());
       assertFalse(
         "Should NOT have processed in-progress file from 00:00:04: " + 
unexpectedFile.getName(),
         actualProcessedFilePrefixes.contains(unexpectedPrefix));
     }
 
     for (Path unexpectedFile : inProgressFiles0102) {
-      String unexpectedPrefix =
-        unexpectedFile.getName().substring(0, 
unexpectedFile.getName().lastIndexOf("_"));
+      String unexpectedPrefix = extractPrefix(unexpectedFile.getName());
       assertFalse(
         "Should NOT have processed in-progress file from 00:01:02: " + 
unexpectedFile.getName(),
         actualProcessedFilePrefixes.contains(unexpectedPrefix));
@@ -651,21 +637,11 @@ public class ReplicationLogDiscoveryTest {
     String file1Prefix = newFilesForRound.get(1).getName().substring(0,
       newFilesForRound.get(1).getName().lastIndexOf("."));
     Mockito.doThrow(new IOException("Processing failed for file 
1")).when(discovery)
-      .processFile(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(file1Prefix);
-      }));
+      .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file1Prefix)));
     String file3Prefix = newFilesForRound.get(3).getName().substring(0,
       newFilesForRound.get(3).getName().lastIndexOf("."));
     Mockito.doThrow(new IOException("Processing failed for file 
3")).when(discovery)
-      .processFile(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(file3Prefix);
-      }));
+      .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file3Prefix)));
 
     // Process new files for the round
     discovery.processNewFilesForRound(replicationRound);
@@ -686,12 +662,8 @@ public class ReplicationLogDiscoveryTest {
     for (Path expectedFile : newFilesForRound) {
       String expectedPrefix =
         expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("."));
-      Mockito.verify(discovery, 
Mockito.times(1)).processFile(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
+      Mockito.verify(discovery, Mockito.times(1))
+        .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
     }
 
     // Verify that markCompleted was called for each successfully processed 
file
@@ -700,80 +672,40 @@ public class ReplicationLogDiscoveryTest {
     // Verify that markCompleted was called for each successfully processed 
file with correct paths
     String expectedPrefix0 = newFilesForRound.get(0).getName().substring(0,
       newFilesForRound.get(0).getName().lastIndexOf("."));
-    Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(expectedPrefix0);
-    }));
+    Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+      Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix0)));
     String expectedPrefix2 = newFilesForRound.get(2).getName().substring(0,
       newFilesForRound.get(2).getName().lastIndexOf("."));
-    Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(expectedPrefix2);
-    }));
+    Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+      Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix2)));
     String expectedPrefix4 = newFilesForRound.get(4).getName().substring(0,
       newFilesForRound.get(4).getName().lastIndexOf("."));
-    Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(expectedPrefix4);
-    }));
+    Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+      Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix4)));
 
     // Verify that markCompleted was NOT called for failed files
     String unexpectedPrefix1 = newFilesForRound.get(1).getName().substring(0,
       newFilesForRound.get(1).getName().lastIndexOf("."));
-    Mockito.verify(fileTracker, 
Mockito.never()).markCompleted(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(unexpectedPrefix1);
-    }));
+    Mockito.verify(fileTracker, Mockito.never()).markCompleted(
+      Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(unexpectedPrefix1)));
     String unexpectedPrefix3 = newFilesForRound.get(3).getName().substring(0,
       newFilesForRound.get(3).getName().lastIndexOf("."));
-    Mockito.verify(fileTracker, 
Mockito.never()).markCompleted(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(unexpectedPrefix3);
-    }));
+    Mockito.verify(fileTracker, Mockito.never()).markCompleted(
+      Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(unexpectedPrefix3)));
 
     // Verify that markFailed was called for failed files
-    Mockito.verify(fileTracker, 
Mockito.times(1)).markFailed(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(unexpectedPrefix1);
-    }));
-    Mockito.verify(fileTracker, 
Mockito.times(1)).markFailed(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(unexpectedPrefix3);
-    }));
+    Mockito.verify(fileTracker, Mockito.times(1))
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(unexpectedPrefix1)));
+    Mockito.verify(fileTracker, Mockito.times(1))
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(unexpectedPrefix3)));
 
     // Verify that markFailed was NOT called for successfully processed files
-    Mockito.verify(fileTracker, 
Mockito.never()).markFailed(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(expectedPrefix0);
-    }));
-    Mockito.verify(fileTracker, 
Mockito.never()).markFailed(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(expectedPrefix2);
-    }));
-    Mockito.verify(fileTracker, 
Mockito.never()).markFailed(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(expectedPrefix4);
-    }));
+    Mockito.verify(fileTracker, Mockito.never())
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix0)));
+    Mockito.verify(fileTracker, Mockito.never())
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix2)));
+    Mockito.verify(fileTracker, Mockito.never())
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix4)));
   }
 
   /**
@@ -793,12 +725,8 @@ public class ReplicationLogDiscoveryTest {
     for (Path file : newFilesForRound) {
       String filePrefix = file.getName().substring(0, 
file.getName().lastIndexOf("."));
       Mockito.doThrow(new IOException("Processing failed for file: " + 
file.getName()))
-        .when(discovery).processFile(Mockito.argThat(path -> {
-          String pathName = path.getName();
-          String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-          String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-          return prefix.equals(filePrefix);
-        }));
+        .when(discovery)
+        .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(filePrefix)));
     }
 
     // Process new files for the round
@@ -820,12 +748,8 @@ public class ReplicationLogDiscoveryTest {
     for (Path expectedFile : newFilesForRound) {
       String expectedPrefix =
         expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("."));
-      Mockito.verify(discovery, 
Mockito.times(1)).processFile(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
+      Mockito.verify(discovery, Mockito.times(1))
+        .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
     }
 
     // Verify that markCompleted was NOT called for any file (all failed)
@@ -838,12 +762,8 @@ public class ReplicationLogDiscoveryTest {
     for (Path failedFile : newFilesForRound) {
       String expectedPrefix =
         failedFile.getName().substring(0, 
failedFile.getName().lastIndexOf("."));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markFailed(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
+      Mockito.verify(fileTracker, Mockito.times(1))
+        .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
     }
   }
 
@@ -853,140 +773,143 @@ public class ReplicationLogDiscoveryTest {
    */
   @Test
   public void testProcessInProgressDirectory() throws IOException {
-    // 1. Create in-progress files for different timestamps
-    long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04
-    List<Path> inProgressFiles0004 = createInProgressFiles(timestamp0004, 3);
+    // Inject an advancing clock so we can verify rename timestamps are recent
+    long initialTime = 1704153800000L;
+    long originalRenameTimestamp = 1704153604000L;
+    AtomicLong clock = new AtomicLong(initialTime);
+    EnvironmentEdge edge = clock::getAndIncrement;
+    EnvironmentEdgeManager.injectEdge(edge);
 
-    long timestamp0102 = 1704153660000L + (2 * 1000L); // 00:01:02
-    List<Path> inProgressFiles0102 = createInProgressFiles(timestamp0102, 2);
+    try {
+      // 1. Create in-progress files for different timestamps with old rename 
timestamps
+      long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04
+      List<Path> inProgressFiles0004 =
+        createInProgressFiles(timestamp0004, 3, originalRenameTimestamp);
 
-    long timestamp0206 = 1704153720000L + (6 * 1000L); // 00:02:06
-    List<Path> inProgressFiles0206 = createInProgressFiles(timestamp0206, 2);
+      long timestamp0102 = 1704153660000L + (2 * 1000L); // 00:01:02
+      List<Path> inProgressFiles0102 =
+        createInProgressFiles(timestamp0102, 2, originalRenameTimestamp);
 
-    // 2. Create some new files to ensure they are NOT processed
-    ReplicationRound replicationRound = new ReplicationRound(1704153600000L, 
1704153660000L); // 00:00:00
-                                                                               
               // -
-                                                                               
               // 00:01:00
-    List<Path> newFilesForRound = createNewFilesForRound(replicationRound, 3);
+      long timestamp0206 = 1704153720000L + (6 * 1000L); // 00:02:06
+      List<Path> inProgressFiles0206 =
+        createInProgressFiles(timestamp0206, 2, originalRenameTimestamp);
 
-    // Process in-progress directory
-    discovery.processInProgressDirectory();
+      // 2. Create some new files to ensure they are NOT processed
+      ReplicationRound replicationRound = new ReplicationRound(1704153600000L, 
1704153660000L); // 00:00:00
+                                                                               
                 // -
+                                                                               
                 // 00:01:00
+      List<Path> newFilesForRound = createNewFilesForRound(replicationRound, 
3);
 
-    // 3. Ensure all in-progress files (7 total) are processed
-    List<Path> processedFiles = discovery.getProcessedFiles();
-    assertEquals("Invalid number of files processed", 7, 
processedFiles.size());
+      // Process in-progress directory
+      discovery.processInProgressDirectory();
 
-    // Create set of expected files that should be processed (by prefix, since 
UUIDs are updated
-    // during markInProgress)
-    Set<String> expectedProcessedFilePrefixes = new HashSet<>();
-    for (Path file : inProgressFiles0004) {
-      String fileName = file.getName();
-      String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
-      expectedProcessedFilePrefixes.add(prefix);
-    }
-    for (Path file : inProgressFiles0102) {
-      String fileName = file.getName();
-      String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
-      expectedProcessedFilePrefixes.add(prefix);
-    }
-    for (Path file : inProgressFiles0206) {
-      String fileName = file.getName();
-      String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
-      expectedProcessedFilePrefixes.add(prefix);
-    }
+      // 3. Ensure all in-progress files (7 total) are processed
+      List<Path> processedFiles = discovery.getProcessedFiles();
+      assertEquals("Invalid number of files processed", 7, 
processedFiles.size());
 
-    // Create set of actually processed file paths (extract prefixes)
-    Set<String> actualProcessedFilePrefixes = new HashSet<>();
-    for (Path file : processedFiles) {
-      String fileName = file.getName();
-      String withoutExtension = fileName.substring(0, 
fileName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      actualProcessedFilePrefixes.add(prefix);
-    }
+      // Create set of expected files that should be processed (by prefix, 
since UUIDs are updated
+      // during markInProgress)
+      Set<String> expectedProcessedFilePrefixes = new HashSet<>();
+      for (Path file : inProgressFiles0004) {
+        String fileName = file.getName();
+        String prefix = extractPrefix(fileName);
+        expectedProcessedFilePrefixes.add(prefix);
+      }
+      for (Path file : inProgressFiles0102) {
+        String fileName = file.getName();
+        String prefix = extractPrefix(fileName);
+        expectedProcessedFilePrefixes.add(prefix);
+      }
+      for (Path file : inProgressFiles0206) {
+        String fileName = file.getName();
+        String prefix = extractPrefix(fileName);
+        expectedProcessedFilePrefixes.add(prefix);
+      }
 
-    // Validate that sets are equal
-    assertEquals("Expected and actual processed files should match", 
expectedProcessedFilePrefixes,
-      actualProcessedFilePrefixes);
+      // Create set of actually processed file paths (extract prefixes)
+      Set<String> actualProcessedFilePrefixes = new HashSet<>();
+      for (Path file : processedFiles) {
+        String fileName = file.getName();
+        String prefix = extractPrefix(fileName);
+        actualProcessedFilePrefixes.add(prefix);
+      }
 
-    // Verify that markInProgress was called 7 times
-    Mockito.verify(fileTracker, 
Mockito.times(7)).markInProgress(Mockito.any(Path.class));
+      // Validate that sets are equal
+      assertEquals("Expected and actual processed files should match",
+        expectedProcessedFilePrefixes, actualProcessedFilePrefixes);
 
-    // Verify that markInProgress was called for each expected file
-    for (Path expectedFile : inProgressFiles0004) {
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markInProgress(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String prefix = pathName.substring(0, pathName.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
-    }
-    for (Path expectedFile : inProgressFiles0102) {
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markInProgress(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String prefix = pathName.substring(0, pathName.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
-    }
-    for (Path expectedFile : inProgressFiles0206) {
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markInProgress(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String prefix = pathName.substring(0, pathName.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
-    }
+      // Verify that markInProgress was called 7 times
+      Mockito.verify(fileTracker, 
Mockito.times(7)).markInProgress(Mockito.any(Path.class));
 
-    // Verify that markCompleted was called for each processed file
-    Mockito.verify(fileTracker, 
Mockito.times(7)).markCompleted(Mockito.any(Path.class));
+      // Verify that markInProgress was called for each expected file
+      for (Path expectedFile : inProgressFiles0004) {
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
+      }
+      for (Path expectedFile : inProgressFiles0102) {
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
+      }
+      for (Path expectedFile : inProgressFiles0206) {
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
+      }
 
-    // Verify that markCompleted was called for each processed file with 
correct paths
-    for (Path expectedFile : inProgressFiles0004) {
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
-    }
-    for (Path expectedFile : inProgressFiles0102) {
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
-    }
-    for (Path expectedFile : inProgressFiles0206) {
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
-    }
+      // Verify that markCompleted was called for each processed file
+      Mockito.verify(fileTracker, 
Mockito.times(7)).markCompleted(Mockito.any(Path.class));
 
-    // Validate that new files were NOT processed (processInProgressDirectory 
only processes
-    // in-progress files)
-    for (Path unexpectedFile : newFilesForRound) {
-      String unexpectedPrefix =
-        unexpectedFile.getName().substring(0, 
unexpectedFile.getName().lastIndexOf("."));
-      assertFalse("Should NOT have processed new file: " + 
unexpectedFile.getName(),
-        actualProcessedFilePrefixes.contains(unexpectedPrefix));
+      // Verify that markCompleted was called for each processed file with 
correct paths
+      for (Path expectedFile : inProgressFiles0004) {
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
+      }
+      for (Path expectedFile : inProgressFiles0102) {
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
+      }
+      for (Path expectedFile : inProgressFiles0206) {
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
+      }
+
+      // Validate that new files were NOT processed 
(processInProgressDirectory only processes
+      // in-progress files)
+      for (Path unexpectedFile : newFilesForRound) {
+        String unexpectedPrefix =
+          unexpectedFile.getName().substring(0, 
unexpectedFile.getName().lastIndexOf("."));
+        assertFalse("Should NOT have processed new file: " + 
unexpectedFile.getName(),
+          actualProcessedFilePrefixes.contains(unexpectedPrefix));
+      }
+
+      // Verify that all processed files had rename timestamps updated to 
recent values
+      // (not the original old rename timestamp)
+      for (Path processedFile : processedFiles) {
+        Optional<Long> renameTs = 
fileTracker.getRenameTimestamp(processedFile);
+        assertTrue("Processed file should have a rename timestamp: " + 
processedFile.getName(),
+          renameTs.isPresent());
+        assertTrue("Processed file's rename timestamp should be recent (>= 
initialTime), but was "
+          + renameTs.get() + " for " + processedFile.getName(), renameTs.get() 
>= initialTime);
+        assertTrue(
+          "Processed file's rename timestamp should be newer than the original 
("
+            + originalRenameTimestamp + "), but was " + renameTs.get(),
+          renameTs.get() > originalRenameTimestamp);
+      }
+    } finally {
+      EnvironmentEdgeManager.reset();
     }
   }
 
   @Test
   public void testProcessInProgressDirectoryWithIntermittentFailure() throws 
IOException {
+    // Allow 2 retries so files that fail once can succeed on retry
+    
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY,
 2);
+
     // Create in-progress files for different timestamps
     long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04
     List<Path> inProgressFiles0004 = createInProgressFiles(timestamp0004, 3);
@@ -1001,24 +924,14 @@ public class ReplicationLogDiscoveryTest {
 
     // Mock processFile to throw exception for specific files (files 1 and 3) 
only on first call,
     // succeed on retry
-    String file1Prefix = allInProgressFiles.get(1).getName().substring(0,
-      allInProgressFiles.get(1).getName().lastIndexOf("_"));
+    String file1Prefix = extractPrefix(allInProgressFiles.get(1).getName());
     Mockito.doThrow(new IOException("Processing failed for file 
1")).doCallRealMethod()
-      .when(discovery).processFile(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(file1Prefix);
-      }));
-    String file3Prefix = allInProgressFiles.get(3).getName().substring(0,
-      allInProgressFiles.get(3).getName().lastIndexOf("_"));
+      .when(discovery)
+      .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file1Prefix)));
+    String file3Prefix = extractPrefix(allInProgressFiles.get(3).getName());
     Mockito.doThrow(new IOException("Processing failed for file 
3")).doCallRealMethod()
-      .when(discovery).processFile(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(file3Prefix);
-      }));
+      .when(discovery)
+      .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file3Prefix)));
 
     // Process in-progress directory
     discovery.processInProgressDirectory();
@@ -1030,15 +943,10 @@ public class ReplicationLogDiscoveryTest {
     // Files 1 and 3 are called twice (initial attempt + retry), others once
     for (int i = 0; i < allInProgressFiles.size(); i++) {
       Path expectedFile = allInProgressFiles.get(i);
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
+      String expectedPrefix = extractPrefix(expectedFile.getName());
       int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are 
retried
-      Mockito.verify(fileTracker, Mockito.times(expectedTimes))
-        .markInProgress(Mockito.argThat(path -> {
-          String pathName = path.getName();
-          String prefix = pathName.substring(0, pathName.lastIndexOf("_"));
-          return prefix.equals(expectedPrefix);
-        }));
+      Mockito.verify(fileTracker, Mockito.times(expectedTimes)).markInProgress(
+        Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
     }
 
     // Verify that processFile was called for each file in the directory (i.e. 
5 + 2 times for
@@ -1049,16 +957,11 @@ public class ReplicationLogDiscoveryTest {
     // Files 1 and 3 should be called twice (fail once, succeed on retry), 
others once
     for (int i = 0; i < allInProgressFiles.size(); i++) {
       Path expectedFile = allInProgressFiles.get(i);
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
+      String expectedPrefix = extractPrefix(expectedFile.getName());
       int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are 
called twice (fail +
                                                       // retry success)
-      Mockito.verify(discovery, 
Mockito.times(expectedTimes)).processFile(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
-      }));
+      Mockito.verify(discovery, Mockito.times(expectedTimes))
+        .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
     }
 
     // Verify that markCompleted was called for each successfully processed 
file
@@ -1068,80 +971,272 @@ public class ReplicationLogDiscoveryTest {
     Mockito.verify(fileTracker, 
Mockito.times(2)).markFailed(Mockito.any(Path.class));
 
     // Verify that markFailed was called once ONLY for failed files
-    String failedPrefix1 = allInProgressFiles.get(1).getName().substring(0,
-      allInProgressFiles.get(1).getName().lastIndexOf("_"));
-    Mockito.verify(fileTracker, 
Mockito.times(1)).markFailed(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(failedPrefix1);
-    }));
-    String failedPrefix3 = allInProgressFiles.get(3).getName().substring(0,
-      allInProgressFiles.get(3).getName().lastIndexOf("_"));
-    Mockito.verify(fileTracker, 
Mockito.times(1)).markFailed(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(failedPrefix3);
-    }));
+    String failedPrefix1 = extractPrefix(allInProgressFiles.get(1).getName());
+    Mockito.verify(fileTracker, Mockito.times(1))
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(failedPrefix1)));
+    String failedPrefix3 = extractPrefix(allInProgressFiles.get(3).getName());
+    Mockito.verify(fileTracker, Mockito.times(1))
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(failedPrefix3)));
 
     // Verify that markFailed was NOT called for files processed successfully 
in first iteration
-    String successPrefix0 = allInProgressFiles.get(0).getName().substring(0,
-      allInProgressFiles.get(0).getName().lastIndexOf("_"));
-    Mockito.verify(fileTracker, 
Mockito.never()).markFailed(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(successPrefix0);
-    }));
-    String successPrefix2 = allInProgressFiles.get(2).getName().substring(0,
-      allInProgressFiles.get(2).getName().lastIndexOf("_"));
-    Mockito.verify(fileTracker, 
Mockito.never()).markFailed(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(successPrefix2);
-    }));
-    String successPrefix4 = allInProgressFiles.get(4).getName().substring(0,
-      allInProgressFiles.get(4).getName().lastIndexOf("_"));
-    Mockito.verify(fileTracker, 
Mockito.never()).markFailed(Mockito.argThat(path -> {
-      String pathName = path.getName();
-      String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-      String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-      return prefix.equals(successPrefix4);
-    }));
+    String successPrefix0 = extractPrefix(allInProgressFiles.get(0).getName());
+    Mockito.verify(fileTracker, Mockito.never())
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(successPrefix0)));
+    String successPrefix2 = extractPrefix(allInProgressFiles.get(2).getName());
+    Mockito.verify(fileTracker, Mockito.never())
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(successPrefix2)));
+    String successPrefix4 = extractPrefix(allInProgressFiles.get(4).getName());
+    Mockito.verify(fileTracker, Mockito.never())
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(successPrefix4)));
 
     // Verify that markCompleted was called for each successfully processed 
file with correct paths
     for (Path expectedFile : allInProgressFiles) {
-      String expectedPrefix =
-        expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-      Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-        String pathName = path.getName();
-        String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-        return prefix.equals(expectedPrefix);
+      String expectedPrefix = extractPrefix(expectedFile.getName());
+      Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+        Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
+    }
+  }
+
+  /**
+   * Tests that with default maxRetries=1, files that fail processing are 
skipped for the rest of
+   * the round and not retried. Verifies that successfully processed files are 
completed, while
+   * failed files are only marked as failed without retry.
+   */
+  @Test
+  public void testProcessInProgressDirectorySkipsFailedFiles() throws 
IOException {
+    // Use default maxRetries=1 (no explicit config override)
+    // Create in-progress files
+    long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04
+    List<Path> inProgressFiles = createInProgressFiles(timestamp0004, 3);
+
+    // Mock processFile to always throw for file 1 (persistent failure)
+    String file1Prefix = extractPrefix(inProgressFiles.get(1).getName());
+    Mockito.doThrow(new IOException("Persistent failure for file 
1")).when(discovery)
+      .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file1Prefix)));
+
+    // Process in-progress directory
+    discovery.processInProgressDirectory();
+
+    // With maxRetries=1, file 1 should fail once and be skipped (no retry)
+    // Files 0 and 2 should succeed
+    // Total markInProgress calls: 3 (one per file, no retries)
+    Mockito.verify(fileTracker, 
Mockito.times(3)).markInProgress(Mockito.any(Path.class));
+
+    // processFile called 3 times (all files attempted once)
+    Mockito.verify(discovery, 
Mockito.times(3)).processFile(Mockito.any(Path.class));
+
+    // markCompleted called only for the 2 successful files
+    Mockito.verify(fileTracker, 
Mockito.times(2)).markCompleted(Mockito.any(Path.class));
+
+    // markFailed called once for the failed file
+    Mockito.verify(fileTracker, 
Mockito.times(1)).markFailed(Mockito.any(Path.class));
+
+    // Verify the failed file was NOT retried (processFile called exactly once 
for file1)
+    Mockito.verify(discovery, Mockito.times(1))
+      .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file1Prefix)));
+
+    // Verify the successful files were each called exactly once
+    String file0Prefix = extractPrefix(inProgressFiles.get(0).getName());
+    Mockito.verify(discovery, Mockito.times(1))
+      .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file0Prefix)));
+    String file2Prefix = extractPrefix(inProgressFiles.get(2).getName());
+    Mockito.verify(discovery, Mockito.times(1))
+      .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file2Prefix)));
+  }
+
+  /**
+   * Tests that when all in-progress files fail processing, the loop 
terminates without retrying any
+   * file (default maxRetries=1). Verifies no infinite loop occurs and all 
files are marked as
+   * failed.
+   */
+  @Test
+  public void testProcessInProgressDirectoryAllFilesFail() throws IOException {
+    // Use default maxRetries=1
+    // Create in-progress files
+    long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04
+    List<Path> inProgressFiles = createInProgressFiles(timestamp0004, 3);
+
+    String file0Prefix = extractPrefix(inProgressFiles.get(0).getName());
+    String file1Prefix = extractPrefix(inProgressFiles.get(1).getName());
+    String file2Prefix = extractPrefix(inProgressFiles.get(2).getName());
+
+    // Mock processFile to always throw for all files
+    Mockito.doThrow(new IOException("Persistent failure")).when(discovery)
+      .processFile(Mockito.any(Path.class));
+
+    // Process in-progress directory - should terminate without infinite loop
+    discovery.processInProgressDirectory();
+
+    // Each file attempted exactly once
+    Mockito.verify(fileTracker, 
Mockito.times(3)).markInProgress(Mockito.any(Path.class));
+    Mockito.verify(discovery, 
Mockito.times(3)).processFile(Mockito.any(Path.class));
+
+    // Verify per-prefix: each file attempted once
+    Mockito.verify(discovery, Mockito.times(1))
+      .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file0Prefix)));
+    Mockito.verify(discovery, Mockito.times(1))
+      .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file1Prefix)));
+    Mockito.verify(discovery, Mockito.times(1))
+      .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file2Prefix)));
+
+    // All files marked as failed
+    Mockito.verify(fileTracker, 
Mockito.times(3)).markFailed(Mockito.any(Path.class));
+    Mockito.verify(fileTracker, Mockito.times(1))
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file0Prefix)));
+    Mockito.verify(fileTracker, Mockito.times(1))
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file1Prefix)));
+    Mockito.verify(fileTracker, Mockito.times(1))
+      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file2Prefix)));
+
+    // No file completed
+    Mockito.verify(fileTracker, 
Mockito.never()).markCompleted(Mockito.any(Path.class));
+  }
+
+  /**
+   * Tests that the failure tracking is scoped per invocation of 
processInProgressDirectory. Files
+   * that fail in the first invocation are skipped for that round, but get a 
fresh chance in the
+   * second invocation where they succeed.
+   */
+  @Test
+  public void testProcessInProgressDirectoryFailedFilesSucceedOnNextRound() 
throws IOException {
+    // Use default maxRetries=1
+    // Inject an advancing clock so we can verify rename timestamps
+    long initialTime = 1704153800000L;
+    long originalRenameTimestamp = 1704153604000L;
+    AtomicLong clock = new AtomicLong(initialTime);
+    EnvironmentEdge edge = clock::getAndIncrement;
+    EnvironmentEdgeManager.injectEdge(edge);
+
+    try {
+      // Create in-progress files with old rename timestamps
+      long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04
+      List<Path> inProgressFiles = createInProgressFiles(timestamp0004, 3, 
originalRenameTimestamp);
+
+      // Extract prefixes for verification
+      String file0Prefix = extractPrefix(inProgressFiles.get(0).getName());
+      String file1Prefix = extractPrefix(inProgressFiles.get(1).getName());
+      String file2Prefix = extractPrefix(inProgressFiles.get(2).getName());
+
+      // Verify original rename timestamp
+      for (Path file : inProgressFiles) {
+        Optional<Long> ts = fileTracker.getRenameTimestamp(file);
+        assertTrue("Original file should have rename timestamp", 
ts.isPresent());
+        assertEquals("Original rename timestamp should match creation value",
+          originalRenameTimestamp, ts.get().longValue());
+      }
+
+      // Mock processFile to fail for file 1 only on the first call, succeed 
on subsequent calls
+      Mockito.doThrow(new IOException("Transient failure for file 
1")).doCallRealMethod()
+        .when(discovery)
+        .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file1Prefix)));
+
+      // --- First invocation: file 1 fails, files 0 and 2 succeed ---
+      discovery.processInProgressDirectory();
+
+      // Verify markInProgress called once per file (3 total, no retries)
+      Mockito.verify(fileTracker, Mockito.times(1))
+        .markInProgress(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file0Prefix)));
+      Mockito.verify(fileTracker, Mockito.times(1))
+        .markInProgress(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file1Prefix)));
+      Mockito.verify(fileTracker, Mockito.times(1))
+        .markInProgress(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file2Prefix)));
+
+      // Verify processFile called once per file
+      Mockito.verify(discovery, Mockito.times(1))
+        .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file0Prefix)));
+      Mockito.verify(discovery, Mockito.times(1))
+        .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file1Prefix)));
+      Mockito.verify(discovery, Mockito.times(1))
+        .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file2Prefix)));
+
+      // Verify markCompleted called for files 0 and 2 (successful)
+      Mockito.verify(fileTracker, Mockito.times(1))
+        .markCompleted(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file0Prefix)));
+      Mockito.verify(fileTracker, Mockito.times(1))
+        .markCompleted(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file2Prefix)));
+
+      // Verify markFailed called only for file 1
+      Mockito.verify(fileTracker, Mockito.times(1))
+        .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file1Prefix)));
+      Mockito.verify(fileTracker, Mockito.never())
+        .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file0Prefix)));
+      Mockito.verify(fileTracker, Mockito.never())
+        .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file2Prefix)));
+
+      // After first round: file 1 should still be in in-progress with an 
UPDATED rename timestamp
+      // (not the original old one)
+      Path inProgressDir = fileTracker.getInProgressDirPath();
+      FileStatus[] remainingFiles = localFs.listStatus(inProgressDir);
+      assertEquals("Only file 1 should remain in in-progress after first 
round", 1,
+        remainingFiles.length);
+      Path failedFilePath = remainingFiles[0].getPath();
+      assertEquals("Remaining file should have file 1's prefix", file1Prefix,
+        extractPrefix(failedFilePath.getName()));
+      Optional<Long> updatedRenameTs = 
fileTracker.getRenameTimestamp(failedFilePath);
+      assertTrue("Failed file should have a rename timestamp", 
updatedRenameTs.isPresent());
+      assertTrue("Failed file's rename timestamp should be newer than the 
original",
+        updatedRenameTs.get() > originalRenameTimestamp);
+      assertTrue("Failed file's rename timestamp should be recent (>= 
initialTime)",
+        updatedRenameTs.get() >= initialTime);
+      long firstRoundRenameTs = updatedRenameTs.get();
+
+      // --- Second invocation: file 1 should be retried and succeed ---
+      Mockito.clearInvocations(fileTracker, discovery);
+
+      discovery.processInProgressDirectory();
+
+      // Verify only file 1 was picked up (files 0 and 2 already completed)
+      Mockito.verify(fileTracker, Mockito.times(1))
+        .markInProgress(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file1Prefix)));
+      Mockito.verify(fileTracker, Mockito.never())
+        .markInProgress(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file0Prefix)));
+      Mockito.verify(fileTracker, Mockito.never())
+        .markInProgress(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file2Prefix)));
+
+      // Verify processFile called only for file 1 and it succeeded
+      Mockito.verify(discovery, 
Mockito.times(1)).processFile(Mockito.argThat(path -> {
+        if (!extractPrefix(path.getName()).equals(file1Prefix)) {
+          return false;
+        }
+        // Verify the path passed to processFile has a rename timestamp newer 
than
+        // the first round's rename timestamp
+        Optional<Long> ts = fileTracker.getRenameTimestamp(path);
+        assertTrue("processFile path should have rename timestamp", 
ts.isPresent());
+        assertTrue("Second round rename timestamp should be newer than first 
round's",
+          ts.get() > firstRoundRenameTs);
+        return true;
       }));
+
+      // Verify markCompleted called for file 1
+      Mockito.verify(fileTracker, Mockito.times(1))
+        .markCompleted(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file1Prefix)));
+
+      // Verify markFailed was not called in the second round
+      Mockito.verify(fileTracker, 
Mockito.never()).markFailed(Mockito.any(Path.class));
+    } finally {
+      EnvironmentEdgeManager.reset();
     }
   }
 
   /**
-   * Tests processing of in-progress directory when no files meet the 
timestamp criteria. Validates
-   * that no files are processed when all files are too recent.
+   * Tests processing of in-progress directory when no files meet the rename 
timestamp criteria.
+   * Validates that no files are processed when all files were recently 
renamed.
    */
   @Test
   public void testProcessInProgressDirectoryWithNoOldFiles() throws 
IOException {
-    // Set up current time for consistent testing
+    // Set up current time and override min age to 60 seconds for this test
     long currentTime = 1704153660000L; // 00:01:00
+    
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
 60);
     EnvironmentEdge edge = () -> currentTime;
     EnvironmentEdgeManager.injectEdge(edge);
 
     try {
-      // Create only recent files (all within the threshold)
-      long recentTimestamp1 = 1704153655000L; // 00:00:55 (5 seconds old)
-      long recentTimestamp2 = 1704153658000L; // 00:00:58 (2 seconds old)
+      // Create files with recent rename timestamps (within the 60-second min 
age window)
+      long recentRename1 = currentTime - 5000L; // renamed 5 seconds ago
+      long recentRename2 = currentTime - 2000L; // renamed 2 seconds ago
 
-      List<Path> recentFiles1 = createInProgressFiles(recentTimestamp1, 2);
-      List<Path> recentFiles2 = createInProgressFiles(recentTimestamp2, 2);
+      List<Path> recentFiles1 = createInProgressFiles(1704153600000L, 2, 
recentRename1);
+      List<Path> recentFiles2 = createInProgressFiles(1704153600000L, 2, 
recentRename2);
 
       // Process in-progress directory
       discovery.processInProgressDirectory();
@@ -1149,39 +1244,40 @@ public class ReplicationLogDiscoveryTest {
       // Get processed files
       List<Path> processedFiles = discovery.getProcessedFiles();
 
-      // Verify that no files were processed (all files are too recent)
-      assertEquals("Should not process any files when all files are too 
recent", 0,
+      // Verify that no files were processed (all rename timestamps are too 
recent)
+      assertEquals("Should not process any files when all files were recently 
renamed", 0,
         processedFiles.size());
 
     } finally {
       EnvironmentEdgeManager.reset();
+      
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
 0);
     }
   }
 
   /**
-   * Tests processing of in-progress directory with timestamp filtering using
-   * getOlderInProgressFiles. Validates that only files older than the 
calculated threshold are
-   * processed, excluding recent files.
+   * Tests processing of in-progress directory with rename timestamp 
filtering. Validates that only
+   * files whose rename timestamp is older than the configured minimum age are 
processed.
    */
   @Test
   public void testProcessInProgressDirectoryWithTimestampFiltering() throws 
IOException {
-    // Set up current time for consistent testing
+    // Set up current time and override min age to 60 seconds for this test
     long currentTime = 1704153660000L; // 00:01:00
+    
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
 60);
     EnvironmentEdge edge = () -> currentTime;
     EnvironmentEdgeManager.injectEdge(edge);
 
     try {
-      // Create files with various ages
-      long veryOldTimestamp = 1704153600000L; // 00:00:00 (1 minute old) - 
should be processed
-      long oldTimestamp = 1704153630000L; // 00:00:30 (30 seconds old) - 
should be processed
-      long recentTimestamp = 1704153655000L; // 00:00:55 (5 seconds old) - 
should NOT be processed
-      long veryRecentTimestamp = 1704153658000L; // 00:00:58 (2 seconds old) - 
should NOT be
-                                                 // processed
-
-      List<Path> veryOldFiles = createInProgressFiles(veryOldTimestamp, 1);
-      List<Path> oldFiles = createInProgressFiles(oldTimestamp, 1);
-      List<Path> recentFiles = createInProgressFiles(recentTimestamp, 1);
-      List<Path> veryRecentFiles = createInProgressFiles(veryRecentTimestamp, 
1);
+      // threshold = currentTime - 60s = 1704153600000
+      // Files renamed before threshold should be processed
+      long veryOldRename = currentTime - 120000L; // renamed 2 min ago - 
should be processed
+      long oldRename = currentTime - 70000L; // renamed 70s ago - should be 
processed
+      long recentRename = currentTime - 5000L; // renamed 5s ago - should NOT 
be processed
+      long veryRecentRename = currentTime - 2000L; // renamed 2s ago - should 
NOT be processed
+
+      List<Path> veryOldFiles = createInProgressFiles(1704153500000L, 1, 
veryOldRename);
+      List<Path> oldFiles = createInProgressFiles(1704153500001L, 1, 
oldRename);
+      List<Path> recentFiles = createInProgressFiles(1704153500002L, 1, 
recentRename);
+      List<Path> veryRecentFiles = createInProgressFiles(1704153500003L, 1, 
veryRecentRename);
 
       // Process in-progress directory
       discovery.processInProgressDirectory();
@@ -1190,7 +1286,7 @@ public class ReplicationLogDiscoveryTest {
       List<Path> processedFiles = discovery.getProcessedFiles();
 
       // Verify that only old files were processed (2 old files, 0 recent 
files)
-      assertEquals("Should process only old files based on timestamp 
filtering", 2,
+      assertEquals("Should process only old files based on rename timestamp 
filtering", 2,
         processedFiles.size());
 
       // Create set of expected processed files (by prefix, since UUIDs are 
updated during
@@ -1198,12 +1294,12 @@ public class ReplicationLogDiscoveryTest {
       Set<String> expectedProcessedFilePrefixes = new HashSet<>();
       for (Path file : veryOldFiles) {
         String fileName = file.getName();
-        String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
+        String prefix = extractPrefix(fileName);
         expectedProcessedFilePrefixes.add(prefix);
       }
       for (Path file : oldFiles) {
         String fileName = file.getName();
-        String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
+        String prefix = extractPrefix(fileName);
         expectedProcessedFilePrefixes.add(prefix);
       }
 
@@ -1211,8 +1307,7 @@ public class ReplicationLogDiscoveryTest {
       Set<String> actualProcessedFilePrefixes = new HashSet<>();
       for (Path file : processedFiles) {
         String fileName = file.getName();
-        String withoutExtension = fileName.substring(0, 
fileName.lastIndexOf("."));
-        String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
+        String prefix = extractPrefix(fileName);
         actualProcessedFilePrefixes.add(prefix);
       }
 
@@ -1225,22 +1320,14 @@ public class ReplicationLogDiscoveryTest {
 
       // Verify that markInProgress was called for each expected file
       for (Path expectedFile : veryOldFiles) {
-        String expectedPrefix =
-          expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-        Mockito.verify(fileTracker, 
Mockito.times(1)).markInProgress(Mockito.argThat(path -> {
-          String pathName = path.getName();
-          String prefix = pathName.substring(0, pathName.lastIndexOf("_"));
-          return prefix.equals(expectedPrefix);
-        }));
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
       }
       for (Path expectedFile : oldFiles) {
-        String expectedPrefix =
-          expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-        Mockito.verify(fileTracker, 
Mockito.times(1)).markInProgress(Mockito.argThat(path -> {
-          String pathName = path.getName();
-          String prefix = pathName.substring(0, pathName.lastIndexOf("_"));
-          return prefix.equals(expectedPrefix);
-        }));
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
       }
 
       // Verify that markCompleted was called 2 times
@@ -1248,42 +1335,31 @@ public class ReplicationLogDiscoveryTest {
 
       // Verify that markCompleted was called for each expected file with 
correct paths
       for (Path expectedFile : veryOldFiles) {
-        String expectedPrefix =
-          expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-        Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-          String pathName = path.getName();
-          String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-          String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-          return prefix.equals(expectedPrefix);
-        }));
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
       }
       for (Path expectedFile : oldFiles) {
-        String expectedPrefix =
-          expectedFile.getName().substring(0, 
expectedFile.getName().lastIndexOf("_"));
-        Mockito.verify(fileTracker, 
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
-          String pathName = path.getName();
-          String withoutExtension = pathName.substring(0, 
pathName.lastIndexOf("."));
-          String prefix = withoutExtension.substring(0, 
withoutExtension.lastIndexOf("_"));
-          return prefix.equals(expectedPrefix);
-        }));
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
       }
 
       // Verify that recent files were NOT processed
       for (Path file : recentFiles) {
-        String unexpectedPrefix = file.getName().substring(0, 
file.getName().lastIndexOf("_"));
-        assertFalse(
-          "Recent files should not be processed due to timestamp filtering: " 
+ file.getName(),
-          actualProcessedFilePrefixes.contains(unexpectedPrefix));
+        String unexpectedPrefix = extractPrefix(file.getName());
+        assertFalse("Recent files should not be processed due to rename 
timestamp filtering: "
+          + file.getName(), 
actualProcessedFilePrefixes.contains(unexpectedPrefix));
       }
       for (Path file : veryRecentFiles) {
-        String unexpectedPrefix = file.getName().substring(0, 
file.getName().lastIndexOf("_"));
-        assertFalse(
-          "Recent files should not be processed due to timestamp filtering: " 
+ file.getName(),
-          actualProcessedFilePrefixes.contains(unexpectedPrefix));
+        String unexpectedPrefix = extractPrefix(file.getName());
+        assertFalse("Recent files should not be processed due to rename 
timestamp filtering: "
+          + file.getName(), 
actualProcessedFilePrefixes.contains(unexpectedPrefix));
       }
 
     } finally {
       EnvironmentEdgeManager.reset();
+      
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
 0);
     }
   }
 
@@ -1775,14 +1851,32 @@ public class ReplicationLogDiscoveryTest {
     return newFiles;
   }
 
+  /**
+   * Extracts the stable prefix (<timestamp>_<server>) from a filename. Works 
for all formats:
+   * "ts_server.plog", "ts_server_UUID_renameTs.plog"
+   */
+  private static String extractPrefix(String fileName) {
+    String[] parts = fileName.split("_");
+    if (parts.length < 2) {
+      return fileName.split("\\.")[0];
+    }
+    String secondPart = parts[1].contains(".") ? parts[1].split("\\.")[0] : 
parts[1];
+    return parts[0] + "_" + secondPart;
+  }
+
   private List<Path> createInProgressFiles(long timestamp, int count) throws 
IOException {
-    // Create in-progress files
+    return createInProgressFiles(timestamp, count, timestamp);
+  }
+
+  private List<Path> createInProgressFiles(long timestamp, int count, long 
renameTimestamp)
+    throws IOException {
     Path inProgressDir = fileTracker.getInProgressDirPath();
     localFs.mkdirs(inProgressDir);
     List<Path> inProgressFiles = new ArrayList<>();
     for (int i = 0; i < count; i++) {
       String uuid = "12345678-1234-1234-1234-123456789abc" + i;
-      Path inProgressFile = new Path(inProgressDir, timestamp + "_rs-" + i + 
"_" + uuid + ".plog");
+      Path inProgressFile = new Path(inProgressDir,
+        timestamp + "_rs-" + i + "_" + uuid + "_" + renameTimestamp + ".plog");
       localFs.create(inProgressFile, true).close();
       inProgressFiles.add(inProgressFile);
     }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
index 2037b25c0e..6d6d8163a2 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.phoenix.replication.metrics.MetricsReplicationLogTracker;
 import 
org.apache.phoenix.replication.metrics.MetricsReplicationLogTrackerReplayImpl;
 import org.apache.phoenix.replication.reader.ReplicationLogReplay;
@@ -631,58 +633,76 @@ public class ReplicationLogTrackerTest {
     // Initialize tracker
     tracker.init();
 
-    // Create a file in a shard directory (without UUID)
-    ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
-    List<Path> allShardPaths = shardManager.getAllShardPaths();
-    Path shardPath = allShardPaths.get(0);
-    localFs.mkdirs(shardPath);
-
-    // Create original file without UUID
-    Path originalFile = new Path(shardPath, "1704153600000_rs1.plog");
-    localFs.create(originalFile, true).close();
+    // Inject a fixed time so we can verify the exact rename timestamp
+    long fixedTime = 1704153700000L;
+    EnvironmentEdge edge = () -> fixedTime;
+    EnvironmentEdgeManager.injectEdge(edge);
 
-    // Verify original file exists
-    assertTrue("Original file should exist", localFs.exists(originalFile));
-
-    // Call markInProgress
-    Optional<Path> result = tracker.markInProgress(originalFile);
-
-    // Verify file system operation counts
-    // markInProgress involves moving file from shard directory to in-progress 
directory
-    // It should call exists() for only in progress directory (during init), 
rename() to move file
-    Mockito.verify(mockFs, times(1)).exists(Mockito.any(Path.class));
-    Mockito.verify(mockFs, 
times(1)).exists(Mockito.eq(tracker.getInProgressDirPath()));
-    Mockito.verify(mockFs, times(1)).rename(Mockito.any(Path.class), 
Mockito.any(Path.class));
-    Mockito.verify(mockFs, times(1)).rename(Mockito.eq(originalFile), 
Mockito.any(Path.class));
-    // Ensure no listStatus() is called
-    Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
-
-    // Verify operation was successful
-    assertTrue("markInProgress should be successful", result.isPresent());
-
-    // Verify original file no longer exists
-    assertFalse("Original file should no longer exist", 
localFs.exists(originalFile));
+    try {
+      // Create a file in a shard directory (without UUID)
+      ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+      List<Path> allShardPaths = shardManager.getAllShardPaths();
+      Path shardPath = allShardPaths.get(0);
+      localFs.mkdirs(shardPath);
 
-    // Verify file was moved to in-progress directory with UUID
-    Path inProgressDir = tracker.getInProgressDirPath();
-    FileStatus[] files = localFs.listStatus(inProgressDir);
-    assertEquals("Should have exactly one file in in-progress directory", 1, 
files.length);
-
-    // Verify the new file has UUID format and is in in-progress directory
-    String newFileName = files[0].getPath().getName();
-    assertTrue("New file should have UUID suffix", newFileName.matches(
-      
"1704153600000_rs1_[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}\\.plog"));
-
-    // Assert that renamed file is in in-progress directory
-    Path renamedFile = files[0].getPath();
-    assertTrue("Renamed file should be in in-progress directory",
-      
renamedFile.getParent().toUri().getPath().equals(tracker.getInProgressDirPath().toString()));
-
-    // Assert that renamed file has same prefix as original file
-    String originalFileName = originalFile.getName();
-    String originalPrefix = originalFileName.substring(0, 
originalFileName.lastIndexOf('.'));
-    assertTrue("Renamed file should have same prefix as original file",
-      newFileName.startsWith(originalPrefix + "_"));
+      // Create original file without UUID
+      Path originalFile = new Path(shardPath, "1704153600000_rs1.plog");
+      localFs.create(originalFile, true).close();
+
+      // Verify original file exists
+      assertTrue("Original file should exist", localFs.exists(originalFile));
+
+      // Call markInProgress
+      Optional<Path> result = tracker.markInProgress(originalFile);
+
+      // Verify file system operation counts
+      // markInProgress involves moving file from shard directory to 
in-progress directory
+      // It should call exists() for only in progress directory (during init), 
rename() to move file
+      Mockito.verify(mockFs, times(1)).exists(Mockito.any(Path.class));
+      Mockito.verify(mockFs, 
times(1)).exists(Mockito.eq(tracker.getInProgressDirPath()));
+      Mockito.verify(mockFs, times(1)).rename(Mockito.any(Path.class), 
Mockito.any(Path.class));
+      Mockito.verify(mockFs, times(1)).rename(Mockito.eq(originalFile), 
Mockito.any(Path.class));
+      // Ensure no listStatus() is called
+      Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
+
+      // Verify operation was successful
+      assertTrue("markInProgress should be successful", result.isPresent());
+
+      // Verify original file no longer exists
+      assertFalse("Original file should no longer exist", 
localFs.exists(originalFile));
+
+      // Verify file was moved to in-progress directory with UUID
+      Path inProgressDir = tracker.getInProgressDirPath();
+      FileStatus[] files = localFs.listStatus(inProgressDir);
+      assertEquals("Should have exactly one file in in-progress directory", 1, 
files.length);
+
+      // Verify the new file has UUID and rename timestamp format:
+      // <ts>_<server>_<UUID>_<renameTs>.plog
+      String newFileName = files[0].getPath().getName();
+      assertTrue("New file should have UUID and rename timestamp",
+        newFileName
+          
.matches("1704153600000_rs1_[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"
+            + "_[0-9]+\\.plog"));
+
+      // Assert that renamed file is in in-progress directory
+      Path renamedFile = files[0].getPath();
+      assertTrue("Renamed file should be in in-progress directory", 
renamedFile.getParent().toUri()
+        .getPath().equals(tracker.getInProgressDirPath().toString()));
+
+      // Assert that renamed file has same prefix as original file
+      String originalFileName = originalFile.getName();
+      String originalPrefix = originalFileName.substring(0, 
originalFileName.lastIndexOf('.'));
+      assertTrue("Renamed file should have same prefix as original file",
+        newFileName.startsWith(originalPrefix + "_"));
+
+      // Verify rename timestamp matches the injected current time exactly
+      Optional<Long> renameTimestamp = tracker.getRenameTimestamp(renamedFile);
+      assertTrue("Rename timestamp should be present", 
renameTimestamp.isPresent());
+      assertEquals("Rename timestamp should match the current time", fixedTime,
+        renameTimestamp.get().longValue());
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
   }
 
   @Test
@@ -690,49 +710,70 @@ public class ReplicationLogTrackerTest {
     // Initialize tracker
     tracker.init();
 
-    // Create a file in in-progress directory with existing UUID
-    Path inProgressDir = tracker.getInProgressDirPath();
-    String existingUUID = "12345678-1234-1234-1234-123456789abc";
-    Path originalFile = new Path(inProgressDir, "1704153600000_rs1_" + 
existingUUID + ".plog");
-    localFs.create(originalFile, true).close();
-
-    // Verify original file exists
-    assertTrue("Original file should exist", localFs.exists(originalFile));
-
-    // Call markInProgress
-    Optional<Path> result = tracker.markInProgress(originalFile);
-
-    // Verify file system operation counts
-    // markInProgress involves re-naming file int the in-progress directory
-    // It should call exists() for only in progress directory (during init), 
rename() to move file
-    Mockito.verify(mockFs, times(1)).exists(Mockito.any(Path.class));
-    Mockito.verify(mockFs, 
times(1)).exists(Mockito.eq(tracker.getInProgressDirPath()));
-    Mockito.verify(mockFs, times(1)).rename(Mockito.any(Path.class), 
Mockito.any(Path.class));
-    Mockito.verify(mockFs, times(1)).rename(Mockito.eq(originalFile), 
Mockito.any(Path.class));
-    // Ensure no listStatus() is called
-    Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
-
-    // Verify operation was successful
-    assertTrue("markInProgress should be successful", result.isPresent());
-
-    // Verify original file no longer exists
-    assertFalse("Original file should no longer exist", 
localFs.exists(originalFile));
-
-    // Verify new file exists in same directory with new UUID
-    FileStatus[] files = localFs.listStatus(inProgressDir);
-    assertEquals("Should have exactly one file in in-progress directory", 1, 
files.length);
-
-    // Verify the new file has different UUID
-    String newFileName = files[0].getPath().getName();
-    assertTrue("New file should have UUID suffix", newFileName.matches(
-      
"1704153600000_rs1_[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}\\.plog"));
-    assertFalse("New file should have different UUID", 
newFileName.contains(existingUUID));
-
-    // Assert that renamed file has same prefix as original file
-    String originalFileName = originalFile.getName();
-    String originalPrefix = originalFileName.substring(0, 
originalFileName.lastIndexOf('_'));
-    assertTrue("Renamed file should have same prefix as original file",
-      newFileName.startsWith(originalPrefix + "_"));
+    // Inject a fixed time that is newer than the existing rename timestamp
+    long existingRenameTimestamp = 1704153660000L;
+    long fixedTime = 1704153800000L;
+    EnvironmentEdge edge = () -> fixedTime;
+    EnvironmentEdgeManager.injectEdge(edge);
+
+    try {
+      // Create a file in in-progress directory with existing UUID and rename 
timestamp
+      Path inProgressDir = tracker.getInProgressDirPath();
+      String existingUUID = "12345678-1234-1234-1234-123456789abc";
+      Path originalFile = new Path(inProgressDir,
+        "1704153600000_rs1_" + existingUUID + "_" + existingRenameTimestamp + 
".plog");
+      localFs.create(originalFile, true).close();
+
+      // Verify original file exists
+      assertTrue("Original file should exist", localFs.exists(originalFile));
+
+      // Call markInProgress
+      Optional<Path> result = tracker.markInProgress(originalFile);
+
+      // Verify file system operation counts
+      // markInProgress involves re-naming file in the in-progress directory
+      // It should call exists() for only in progress directory (during init), 
rename() to move file
+      Mockito.verify(mockFs, times(1)).exists(Mockito.any(Path.class));
+      Mockito.verify(mockFs, 
times(1)).exists(Mockito.eq(tracker.getInProgressDirPath()));
+      Mockito.verify(mockFs, times(1)).rename(Mockito.any(Path.class), 
Mockito.any(Path.class));
+      Mockito.verify(mockFs, times(1)).rename(Mockito.eq(originalFile), 
Mockito.any(Path.class));
+      // Ensure no listStatus() is called
+      Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
+
+      // Verify operation was successful
+      assertTrue("markInProgress should be successful", result.isPresent());
+
+      // Verify original file no longer exists
+      assertFalse("Original file should no longer exist", 
localFs.exists(originalFile));
+
+      // Verify new file exists in same directory with new UUID and new rename 
timestamp
+      FileStatus[] files = localFs.listStatus(inProgressDir);
+      assertEquals("Should have exactly one file in in-progress directory", 1, 
files.length);
+
+      // Verify the new file has different UUID and a rename timestamp:
+      // <ts>_<server>_<UUID>_<renameTs>.plog
+      String newFileName = files[0].getPath().getName();
+      assertTrue("New file should have UUID and rename timestamp",
+        newFileName
+          
.matches("1704153600000_rs1_[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"
+            + "_[0-9]+\\.plog"));
+      assertFalse("New file should have different UUID", 
newFileName.contains(existingUUID));
+
+      // Assert that renamed file has same prefix (ts_server) as original file
+      String expectedPrefix = "1704153600000_rs1";
+      assertTrue("Renamed file should have same prefix as original file",
+        newFileName.startsWith(expectedPrefix + "_"));
+
+      // Verify rename timestamp is updated to the current (injected) time, 
not the old one
+      Optional<Long> renameTimestamp = 
tracker.getRenameTimestamp(files[0].getPath());
+      assertTrue("Rename timestamp should be present", 
renameTimestamp.isPresent());
+      assertEquals("Rename timestamp should match the current time (not the 
old value)", fixedTime,
+        renameTimestamp.get().longValue());
+      assertTrue("Rename timestamp should be newer than the original",
+        renameTimestamp.get() > existingRenameTimestamp);
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
   }
 
   @Test
@@ -1090,9 +1131,9 @@ public class ReplicationLogTrackerTest {
     Optional<String> newFileUUID = tracker.getFileUUID(newFile);
     assertFalse("New file without UUID should return empty Optional", 
newFileUUID.isPresent());
 
-    // 2. For in-progress file - with UUID
+    // 2. For in-progress file - with UUID and rename timestamp
     Path inProgressFile = new Path(tracker.getInProgressDirPath(),
-      "1704153600000_rs1_12345678-1234-1234-1234-123456789abc.plog");
+      
"1704153600000_rs1_12345678-1234-1234-1234-123456789abc_1704153660000.plog");
     Optional<String> inProgressFileUUID = tracker.getFileUUID(inProgressFile);
     assertTrue("In-progress file with UUID should return present Optional",
       inProgressFileUUID.isPresent());
@@ -1101,12 +1142,19 @@ public class ReplicationLogTrackerTest {
 
     // Test with different UUID
     Path anotherInProgressFile = new Path(tracker.getInProgressDirPath(),
-      "1704153600000_rs1_87654321-4321-4321-4321-cba987654321.plog");
+      
"1704153600000_rs1_87654321-4321-4321-4321-cba987654321_1704153660000.plog");
     Optional<String> anotherUUID = tracker.getFileUUID(anotherInProgressFile);
     assertTrue("Another in-progress file with UUID should return present 
Optional",
       anotherUUID.isPresent());
     assertEquals("Another in-progress file UUID should be extracted correctly",
       "87654321-4321-4321-4321-cba987654321", anotherUUID.get());
+
+    // 3. Old format without rename timestamp (3 parts) should return empty
+    Path oldFormatFile = new Path(tracker.getInProgressDirPath(),
+      "1704153600000_rs1_12345678-1234-1234-1234-123456789abc.plog");
+    Optional<String> oldFormatUUID = tracker.getFileUUID(oldFormatFile);
+    assertFalse("Old format file without rename timestamp should return empty",
+      oldFormatUUID.isPresent());
   }
 
   @Test
@@ -1117,31 +1165,36 @@ public class ReplicationLogTrackerTest {
     // Get the in-progress directory path
     Path inProgressDir = tracker.getInProgressDirPath();
 
-    // Create files with different timestamps
-    long baseTimestamp = 1704153600000L; // 2024-01-02 00:00:00
-    long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1); // 1 
hour later
+    // Create files with different rename timestamps
+    long baseRenameTimestamp = 1704153600000L; // 2024-01-02 00:00:00
+    long thresholdTimestamp = baseRenameTimestamp + 
TimeUnit.HOURS.toMillis(1); // 1 hour later
+    String uuid = "12345678-1234-1234-1234-123456789abc";
 
-    // Files older than threshold (should be returned)
+    // Files with rename timestamps older than threshold (should be returned)
+    long oldRename1 = baseRenameTimestamp + TimeUnit.MINUTES.toMillis(30);
+    long oldRename2 = baseRenameTimestamp + TimeUnit.MINUTES.toMillis(45);
     Path oldFile1 =
-      new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(30)) 
+ "_rs1.plog");
+      new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + oldRename1 + 
".plog");
     Path oldFile2 =
-      new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(45)) 
+ "_rs2.plog");
+      new Path(inProgressDir, "1704153600000_rs2_" + uuid + "_" + oldRename2 + 
".plog");
 
-    // Files newer than threshold (should not be returned)
+    // Files with rename timestamps newer than threshold (should not be 
returned)
+    long newRename1 = baseRenameTimestamp + TimeUnit.HOURS.toMillis(2);
+    long newRename2 = baseRenameTimestamp + TimeUnit.HOURS.toMillis(3);
     Path newFile1 =
-      new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(2)) + 
"_rs3.plog");
+      new Path(inProgressDir, "1704153600000_rs3_" + uuid + "_" + newRename1 + 
".plog");
     Path newFile2 =
-      new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(3)) + 
"_rs4.plog");
+      new Path(inProgressDir, "1704153600000_rs4_" + uuid + "_" + newRename2 + 
".plog");
 
-    // Invalid files (should be skipped)
-    Path invalidFile = new Path(inProgressDir, "invalid_timestamp_rs5.plog");
+    // Files without rename timestamp (should be skipped)
+    Path noRenameFile = new Path(inProgressDir, "1704153600000_rs5_" + uuid + 
".plog");
 
     // Create all files
     localFs.create(oldFile1, true).close();
     localFs.create(oldFile2, true).close();
     localFs.create(newFile1, true).close();
     localFs.create(newFile2, true).close();
-    localFs.create(invalidFile, true).close();
+    localFs.create(noRenameFile, true).close();
 
     // Call getOlderInProgressFiles
     List<Path> result = tracker.getOlderInProgressFiles(thresholdTimestamp);
@@ -1159,7 +1212,8 @@ public class ReplicationLogTrackerTest {
     assertTrue("Should contain oldFile2", 
resultFilenames.contains(oldFile2.getName()));
     assertFalse("Should not contain newFile1", 
resultFilenames.contains(newFile1.getName()));
     assertFalse("Should not contain newFile2", 
resultFilenames.contains(newFile2.getName()));
-    assertFalse("Should not contain invalidFile", 
resultFilenames.contains(invalidFile.getName()));
+    assertFalse("Should not contain noRenameFile",
+      resultFilenames.contains(noRenameFile.getName()));
   }
 
   @Test
@@ -1169,15 +1223,18 @@ public class ReplicationLogTrackerTest {
 
     // Get the in-progress directory path
     Path inProgressDir = tracker.getInProgressDirPath();
+    String uuid = "12345678-1234-1234-1234-123456789abc";
 
-    // Create files all newer than threshold
+    // Create files with rename timestamps all newer than threshold
     long baseTimestamp = 1704153600000L;
     long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1);
 
+    long newRename1 = baseTimestamp + TimeUnit.HOURS.toMillis(2);
+    long newRename2 = baseTimestamp + TimeUnit.HOURS.toMillis(3);
     Path newFile1 =
-      new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(2)) + 
"_rs1.plog");
+      new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + newRename1 + 
".plog");
     Path newFile2 =
-      new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(3)) + 
"_rs2.plog");
+      new Path(inProgressDir, "1704153600000_rs2_" + uuid + "_" + newRename2 + 
".plog");
 
     localFs.create(newFile1, true).close();
     localFs.create(newFile2, true).close();
@@ -1236,22 +1293,27 @@ public class ReplicationLogTrackerTest {
 
     // Get the in-progress directory path
     Path inProgressDir = tracker.getInProgressDirPath();
+    String uuid = "12345678-1234-1234-1234-123456789abc";
 
     long baseTimestamp = 1704153600000L;
     long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1);
 
-    // Valid old file (should be returned)
+    // Valid old file with rename timestamp (should be returned)
+    long oldRename = baseTimestamp + TimeUnit.MINUTES.toMillis(30);
     Path validOldFile =
-      new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(30)) 
+ "_rs1.plog");
+      new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + oldRename + 
".plog");
 
-    // Invalid files (should be skipped)
-    Path invalidFile1 = new Path(inProgressDir, "invalid_timestamp_rs2.plog");
-    Path invalidFile2 = new Path(inProgressDir, "not_a_timestamp_rs3.plog");
-    Path invalidFile3 = new Path(inProgressDir, "1704153600000_rs4.txt"); // 
wrong extension
+    // Files without rename timestamp (should be skipped - only 3 parts)
+    Path noRenameFile = new Path(inProgressDir, "1704153600000_rs2_" + uuid + 
".plog");
+    // File with non-numeric rename timestamp (should be skipped)
+    Path invalidRenameFile =
+      new Path(inProgressDir, "1704153600000_rs3_" + uuid + 
"_notanumber.plog");
+    // Wrong extension (should be skipped by isValidLogFile)
+    Path invalidFile3 = new Path(inProgressDir, "1704153600000_rs4.txt");
 
     localFs.create(validOldFile, true).close();
-    localFs.create(invalidFile1, true).close();
-    localFs.create(invalidFile2, true).close();
+    localFs.create(noRenameFile, true).close();
+    localFs.create(invalidRenameFile, true).close();
     localFs.create(invalidFile3, true).close();
 
     // Call getOlderInProgressFiles
@@ -1264,10 +1326,10 @@ public class ReplicationLogTrackerTest {
     Set<String> resultFilenames = 
result.stream().map(Path::getName).collect(Collectors.toSet());
 
     assertTrue("Should contain validOldFile", 
resultFilenames.contains(validOldFile.getName()));
-    assertFalse("Should not contain invalidFile1",
-      resultFilenames.contains(invalidFile1.getName()));
-    assertFalse("Should not contain invalidFile2",
-      resultFilenames.contains(invalidFile2.getName()));
+    assertFalse("Should not contain noRenameFile",
+      resultFilenames.contains(noRenameFile.getName()));
+    assertFalse("Should not contain invalidRenameFile",
+      resultFilenames.contains(invalidRenameFile.getName()));
     assertFalse("Should not contain invalidFile3",
       resultFilenames.contains(invalidFile3.getName()));
   }
@@ -1279,16 +1341,18 @@ public class ReplicationLogTrackerTest {
 
     // Get the in-progress directory path
     Path inProgressDir = tracker.getInProgressDirPath();
+    String uuid = "12345678-1234-1234-1234-123456789abc";
 
     long baseTimestamp = 1704153600000L;
     long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1);
 
-    // File with timestamp exactly at threshold (should NOT be returned - we 
want older than
-    // threshold)
-    Path fileAtThreshold = new Path(inProgressDir, thresholdTimestamp + 
"_rs1.plog");
+    // File with rename timestamp exactly at threshold (should NOT be returned)
+    Path fileAtThreshold =
+      new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + 
thresholdTimestamp + ".plog");
 
-    // File with timestamp just before threshold (should be returned)
-    Path fileJustBeforeThreshold = new Path(inProgressDir, (thresholdTimestamp 
- 1) + "_rs2.plog");
+    // File with rename timestamp just before threshold (should be returned)
+    Path fileJustBeforeThreshold = new Path(inProgressDir,
+      "1704153600000_rs2_" + uuid + "_" + (thresholdTimestamp - 1) + ".plog");
 
     localFs.create(fileAtThreshold, true).close();
     localFs.create(fileJustBeforeThreshold, true).close();
@@ -1315,30 +1379,36 @@ public class ReplicationLogTrackerTest {
 
     // Get the in-progress directory path
     Path inProgressDir = tracker.getInProgressDirPath();
+    String uuid = "12345678-1234-1234-1234-123456789abc";
 
     long baseTimestamp = 1704153600000L;
     long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1);
 
-    // Valid old files (should be returned)
+    // Valid old files with rename timestamps before threshold (should be 
returned)
+    long oldRename1 = baseTimestamp + TimeUnit.MINUTES.toMillis(30);
+    long oldRename2 = baseTimestamp + TimeUnit.MINUTES.toMillis(45);
     Path oldFile1 =
-      new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(30)) 
+ "_rs1.plog");
+      new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + oldRename1 + 
".plog");
     Path oldFile2 =
-      new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(45)) 
+ "_rs2.plog");
+      new Path(inProgressDir, "1704153600000_rs2_" + uuid + "_" + oldRename2 + 
".plog");
 
-    // Valid new files (should not be returned)
+    // Valid file with rename timestamp after threshold (should not be 
returned)
+    long newRename1 = baseTimestamp + TimeUnit.HOURS.toMillis(2);
     Path newFile1 =
-      new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(2)) + 
"_rs3.plog");
+      new Path(inProgressDir, "1704153600000_rs3_" + uuid + "_" + newRename1 + 
".plog");
 
-    // Invalid files (should be skipped)
-    Path invalidFile1 = new Path(inProgressDir, "invalid_timestamp_rs4.plog");
-    Path invalidFile2 = new Path(inProgressDir, "1704153600000_rs5.txt"); // 
wrong extension
-    Path invalidFile3 = new Path(inProgressDir, "not_a_number_rs6.plog");
+    // Files without rename timestamp (should be skipped - only 3 parts)
+    Path noRenameFile = new Path(inProgressDir, "1704153600000_rs4_" + uuid + 
".plog");
+    // Wrong extension (should be skipped by isValidLogFile)
+    Path invalidFile2 = new Path(inProgressDir, "1704153600000_rs5.txt");
+    // Non-numeric rename timestamp (should be skipped)
+    Path invalidFile3 = new Path(inProgressDir, "1704153600000_rs6_" + uuid + 
"_notanumber.plog");
 
     // Create all files
     localFs.create(oldFile1, true).close();
     localFs.create(oldFile2, true).close();
     localFs.create(newFile1, true).close();
-    localFs.create(invalidFile1, true).close();
+    localFs.create(noRenameFile, true).close();
     localFs.create(invalidFile2, true).close();
     localFs.create(invalidFile3, true).close();
 
@@ -1354,8 +1424,8 @@ public class ReplicationLogTrackerTest {
     assertTrue("Should contain oldFile1", 
resultFilenames.contains(oldFile1.getName()));
     assertTrue("Should contain oldFile2", 
resultFilenames.contains(oldFile2.getName()));
     assertFalse("Should not contain newFile1", 
resultFilenames.contains(newFile1.getName()));
-    assertFalse("Should not contain invalidFile1",
-      resultFilenames.contains(invalidFile1.getName()));
+    assertFalse("Should not contain noRenameFile",
+      resultFilenames.contains(noRenameFile.getName()));
     assertFalse("Should not contain invalidFile2",
       resultFilenames.contains(invalidFile2.getName()));
     assertFalse("Should not contain invalidFile3",

Reply via email to