This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new bc1e05878d PHOENIX-7785 Avoid infinite loops & duplicate file
processing for in progress directory (#2395)
bc1e05878d is described below
commit bc1e05878db6ec2fafc33dbc8718474a184fe6ab
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Sun Apr 12 01:56:23 2026 +0530
PHOENIX-7785 Avoid infinite loops & duplicate file processing for in
progress directory (#2395)
---
.../replication/ReplicationLogDiscovery.java | 74 +-
.../phoenix/replication/ReplicationLogTracker.java | 130 +--
.../replication/ReplicationLogDiscoveryTest.java | 940 +++++++++++----------
.../replication/ReplicationLogTrackerTest.java | 352 ++++----
4 files changed, 857 insertions(+), 639 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
index 56f40866a7..175de94f87 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
@@ -18,7 +18,9 @@
package org.apache.phoenix.replication;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -83,6 +85,25 @@ public abstract class ReplicationLogDiscovery {
*/
protected static final double DEFAULT_WAITING_BUFFER_PERCENTAGE = 15.0;
+ /**
+ * Configuration key for maximum number of retries per in-progress file
within a single processing
+ * round. Files that fail this many times are skipped for the rest of the
round.
+ */
+ public static final String REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY =
+ "phoenix.replication.in.progress.file.max.retries";
+
+ public static final int DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES = 1;
+
+ /**
+ * Configuration key for the minimum age (in seconds) of an in-progress
file's rename timestamp
+ * before it becomes eligible for processing. This prevents a file recently
marked in-progress by
+ * one region server from being immediately picked up by another.
+ */
+ public static final String REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY =
+ "phoenix.replication.in.progress.file.min.age.seconds";
+
+ public static final int DEFAULT_IN_PROGRESS_FILE_MIN_AGE_SECONDS = 60;
+
protected final Configuration conf;
protected final String haGroupName;
protected final ReplicationLogTracker replicationLogTracker;
@@ -283,8 +304,9 @@ public abstract class ReplicationLogDiscovery {
}
/**
- * Processes all files (older than 1 round time) in the in-progress
directory. Continuously
- * processes files until no in-progress files remain.
+ * Processes all files in the in-progress directory whose rename timestamp
is older than the
+ * configured minimum age. Continuously processes files until no eligible
in-progress files
+ * remain.
* @throws IOException if there's an error during file processing
*/
protected void processInProgressDirectory() throws IOException {
@@ -293,16 +315,31 @@ public abstract class ReplicationLogDiscovery {
// Increase the count for number of times in progress directory is
processed
getMetrics().incrementNumInProgressDirectoryProcessed();
long startTime = EnvironmentEdgeManager.currentTime();
- long oldestTimestampToProcess =
-
replicationLogTracker.getReplicationShardDirectoryManager().getNearestRoundStartTimestamp(
- EnvironmentEdgeManager.currentTime()) - getReplayIntervalSeconds() *
1000L;
- List<Path> files =
replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
- LOG.info("Number of {} files with oldestTimestampToProcess {} is {} for
haGroup: {}",
- replicationLogTracker.getInProgressLogSubDirectoryName(),
oldestTimestampToProcess,
+ long renameTimestampThreshold =
+ EnvironmentEdgeManager.currentTime() - getInProgressFileMinAgeSeconds()
* 1000L;
+ int maxRetries = getInProgressFileMaxRetries();
+ Map<String, Integer> failureCount = new HashMap<>();
+ List<Path> files =
replicationLogTracker.getOlderInProgressFiles(renameTimestampThreshold);
+ LOG.info("Number of {} files with renameTimestampThreshold {} is {} for
haGroup: {}",
+ replicationLogTracker.getInProgressLogSubDirectoryName(),
renameTimestampThreshold,
files.size(), haGroupName);
while (!files.isEmpty()) {
- processOneRandomFile(files);
- files =
replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
+ Optional<Path> failedFile = processOneRandomFile(files);
+ if (failedFile.isPresent()) {
+ String prefix = replicationLogTracker.getFilePrefix(failedFile.get());
+ int count = failureCount.merge(prefix, 1, Integer::sum);
+ if (count >= maxRetries) {
+ LOG.warn(
+ "File {} (prefix: {}) has failed {} time(s), reached max retries
({}). "
+ + "Skipping for the rest of this round for haGroup: {}",
+ failedFile.get(), prefix, count, maxRetries, haGroupName);
+ }
+ }
+ renameTimestampThreshold =
+ EnvironmentEdgeManager.currentTime() -
getInProgressFileMinAgeSeconds() * 1000L;
+ files =
replicationLogTracker.getOlderInProgressFiles(renameTimestampThreshold);
+ files.removeIf(
+ f -> failureCount.getOrDefault(replicationLogTracker.getFilePrefix(f),
0) >= maxRetries);
}
long duration = EnvironmentEdgeManager.currentTime() - startTime;
LOG.info("Finished in-progress files processing in {}ms for haGroup: {}",
duration,
@@ -314,8 +351,9 @@ public abstract class ReplicationLogDiscovery {
* Processes a single random file from the provided list. Marks the file as
in-progress, processes
* it, and marks it as completed or failed.
* @param files - List of files from which to select and process one randomly
+ * @return the original path of the file that failed, or empty if processing
succeeded
*/
- private void processOneRandomFile(final List<Path> files) throws IOException
{
+ private Optional<Path> processOneRandomFile(final List<Path> files) throws
IOException {
// Pick a random file and process it
Path file = files.get(ThreadLocalRandom.current().nextInt(files.size()));
Optional<Path> optionalInProgressFilePath = Optional.empty();
@@ -329,9 +367,9 @@ public abstract class ReplicationLogDiscovery {
LOG.error("Failed to process the file {}", file, exception);
optionalInProgressFilePath.ifPresent(replicationLogTracker::markFailed);
// Not throwing this exception because next time another random file
will be retried.
- // If it's persistent failure for in_progress directory,
- // cluster state should to be DEGRADED_STANDBY_FOR_READER.
+ return Optional.of(file);
}
+ return Optional.empty();
}
/**
@@ -469,6 +507,16 @@ public abstract class ReplicationLogDiscovery {
return DEFAULT_WAITING_BUFFER_PERCENTAGE;
}
+ public int getInProgressFileMaxRetries() {
+ return conf.getInt(REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY,
+ DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES);
+ }
+
+ public int getInProgressFileMinAgeSeconds() {
+ return conf.getInt(REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
+ DEFAULT_IN_PROGRESS_FILE_MIN_AGE_SECONDS);
+ }
+
public ReplicationLogTracker getReplicationLogFileTracker() {
return this.replicationLogTracker;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
index 03832a4d85..b6a7a105e1 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.replication.metrics.MetricsReplicationLogTracker;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -203,15 +203,15 @@ public class ReplicationLogTracker {
}
/**
- * Retrieves all valid log files in the in-progress directory that are older
than the specified
- * timestamp.
- * @param timestampThreshold - The timestamp threshold in milliseconds.
Files with timestamps less
- * than this value will be returned.
- * @return List of valid log file paths in the in-progress directory that
are older than the
+ * Retrieves all valid log files in the in-progress directory whose rename
timestamp is older than
+ * the specified threshold. Files without a rename timestamp are skipped.
+ * @param renameTimestampThreshold - The timestamp threshold in
milliseconds. Files with rename
+ * timestamps less than this value will be
returned.
+ * @return List of valid log file paths in the in-progress directory that
were renamed before the
* threshold, empty list if directory doesn't exist or no files match
* @throws IOException if there's an error accessing the file system
*/
- public List<Path> getOlderInProgressFiles(long timestampThreshold) throws
IOException {
+ public List<Path> getOlderInProgressFiles(long renameTimestampThreshold)
throws IOException {
if (!fileSystem.exists(getInProgressDirPath())) {
return Collections.emptyList();
}
@@ -221,20 +221,19 @@ public class ReplicationLogTracker {
for (FileStatus status : fileStatuses) {
if (status.isFile() && isValidLogFile(status.getPath())) {
- try {
- long fileTimestamp = getFileTimestamp(status.getPath());
- if (fileTimestamp < timestampThreshold) {
- olderInProgressFiles.add(status.getPath());
- }
- } catch (NumberFormatException e) {
- LOG.warn("Failed to extract timestamp from file {}, skipping",
- status.getPath().getName());
+ Optional<Long> renameTimestamp = getRenameTimestamp(status.getPath());
+ if (!renameTimestamp.isPresent()) {
+ LOG.warn("File {} has no rename timestamp, skipping",
status.getPath().getName());
+ continue;
+ }
+ if (renameTimestamp.get() < renameTimestampThreshold) {
+ olderInProgressFiles.add(status.getPath());
}
}
}
- LOG.debug("Found {} in-progress files older than timestamp {}",
olderInProgressFiles.size(),
- timestampThreshold);
+ LOG.debug("Found {} in-progress files renamed before timestamp {}",
olderInProgressFiles.size(),
+ renameTimestampThreshold);
return olderInProgressFiles;
}
@@ -267,7 +266,7 @@ public class ReplicationLogTracker {
* @return true if file was successfully deleted, false otherwise
*/
protected boolean markCompleted(final Path file) {
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ long startTime = EnvironmentEdgeManager.currentTime();
// Increment the metrics count
getMetrics().incrementMarkFileCompletedRequestCount();
@@ -282,7 +281,7 @@ public class ReplicationLogTracker {
try {
if (fileSystem.delete(fileToDelete, false)) {
LOG.info("Successfully deleted completed file: {}", fileToDelete);
- long endTime = EnvironmentEdgeManager.currentTimeMillis();
+ long endTime = EnvironmentEdgeManager.currentTime();
getMetrics().updateMarkFileCompletedTime(endTime - startTime);
return true;
} else {
@@ -323,13 +322,13 @@ public class ReplicationLogTracker {
} else if (matchingFiles.size() > 1) {
LOG.warn("Multiple matching in-progress files found for prefix {}:
{}", filePrefix,
matchingFiles.size());
- long endTime = EnvironmentEdgeManager.currentTimeMillis();
+ long endTime = EnvironmentEdgeManager.currentTime();
getMetrics().updateMarkFileCompletedTime(endTime - startTime);
return false;
} else {
LOG.warn("No matching in-progress file found for prefix: {}. File
must " + "have "
+ "been deleted by some other process.", filePrefix);
- long endTime = EnvironmentEdgeManager.currentTimeMillis();
+ long endTime = EnvironmentEdgeManager.currentTime();
getMetrics().updateMarkFileCompletedTime(endTime - startTime);
return true;
}
@@ -341,7 +340,7 @@ public class ReplicationLogTracker {
}
}
- long endTime = EnvironmentEdgeManager.currentTimeMillis();
+ long endTime = EnvironmentEdgeManager.currentTime();
getMetrics().updateMarkFileCompletedTime(endTime - startTime);
LOG.error("Failed to delete file after {} attempts: {}", maxRetries + 1,
fileToDelete);
@@ -355,7 +354,7 @@ public class ReplicationLogTracker {
* @return Optional value of renamed path if file rename was successful,
else Optional.empty()
*/
protected Optional<Path> markInProgress(final Path file) {
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ long startTime = EnvironmentEdgeManager.currentTime();
try {
final String fileName = file.getName();
@@ -364,26 +363,20 @@ public class ReplicationLogTracker {
// Check if file is already in in-progress directory
if
(file.getParent().toUri().getPath().equals(getInProgressDirPath().toString())) {
- // File is already in in-progress directory, replace UUID with a new
one
+ // File is already in in-progress directory, replace UUID and rename
timestamp
// keep the directory same as in progress
+ // Format: <ts>_<server>_<UUID>_<renameTs>.plog → extract prefix
(first 2 parts)
String[] parts = fileName.split("_");
- // Remove the last part (UUID) and add new UUID
- StringBuilder newNameBuilder = new StringBuilder();
- for (int i = 0; i < parts.length - 1; i++) {
- if (i > 0) {
- newNameBuilder.append("_");
- }
- newNameBuilder.append(parts[i]);
- }
- String extension = fileName.substring(fileName.lastIndexOf("."));
- newNameBuilder.append("_").append(UUID.randomUUID()).append(extension);
- newFileName = newNameBuilder.toString();
+ String prefix = parts[0] + "_" + parts[1];
+ newFileName =
+ prefix + "_" + UUID.randomUUID() + "_" +
EnvironmentEdgeManager.currentTime() + ".plog";
targetDirectory = file.getParent();
} else {
- // File is not in in-progress directory, add UUID and move to
IN_PROGRESS directory
+ // File is not in in-progress directory, add UUID + rename timestamp
and move to
+ // IN_PROGRESS directory
String baseName = fileName.substring(0, fileName.lastIndexOf("."));
- String extension = fileName.substring(fileName.lastIndexOf("."));
- newFileName = baseName + "_" + UUID.randomUUID() + extension;
+ newFileName =
+ baseName + "_" + UUID.randomUUID() + "_" +
EnvironmentEdgeManager.currentTime() + ".plog";
targetDirectory = getInProgressDirPath();
}
@@ -401,7 +394,7 @@ public class ReplicationLogTracker {
} finally {
// Update the metrics
getMetrics().incrementMarkFileInProgressRequestCount();
- long endTime = EnvironmentEdgeManager.currentTimeMillis();
+ long endTime = EnvironmentEdgeManager.currentTime();
getMetrics().updateMarkFileInProgressTime(endTime - startTime);
}
}
@@ -427,42 +420,55 @@ public class ReplicationLogTracker {
}
/**
- * Extracts the UUID from a log file name. Assumes UUID is the last part of
the filename before
- * the extension.
+ * Extracts the UUID from an in-progress file name. Format:
<ts>_<server>_<UUID>_<renameTs>.plog →
+ * UUID is the third underscore-separated part.
* @param file - The file path to extract UUID from.
- * @return Optional of UUID if file was in progress, else Optional.empty()
+ * @return Optional of UUID if file is in-progress format, else
Optional.empty()
*/
protected Optional<String> getFileUUID(Path file) throws
NumberFormatException {
String[] parts = file.getName().split("_");
- if (parts.length < 3) {
+ if (parts.length < 4) {
return Optional.empty();
}
- return Optional.of(parts[parts.length - 1].split("\\.")[0]);
+ return Optional.of(parts[2]);
}
/**
- * Extracts everything except the UUID (last part) from a file path. For
example, from
- * "1704153600000_rs1_12345678-1234-1234-1234-123456789abc.plog" This method
will return
- * "1704153600000_rs1"
- * @param file - The file path to extract prefix from.
+ * Extracts the rename timestamp from an in-progress file name. Format:
+ * <ts>_<server>_<UUID>_<renameTs>.plog → renameTs is the last
underscore-separated part (before
+ * extension).
+ * @param file - The file path to extract rename timestamp from.
+ * @return Optional of rename timestamp, or empty if not present or invalid
*/
- protected String getFilePrefix(Path file) {
- String fileName = file.getName();
- String[] parts = fileName.split("_");
- if (parts.length < 3) {
- return fileName.split("\\.")[0]; // Return full filename if no
underscore found
+ public Optional<Long> getRenameTimestamp(Path file) {
+ String[] parts = file.getName().split("_");
+ if (parts.length < 4) {
+ return Optional.empty();
}
-
- // Return everything except the last part (UUID)
- StringBuilder prefix = new StringBuilder();
- for (int i = 0; i < parts.length - 1; i++) {
- if (i > 0) {
- prefix.append("_");
- }
- prefix.append(parts[i]);
+ try {
+ String lastPart = parts[parts.length - 1];
+ String withoutExtension = lastPart.substring(0,
lastPart.lastIndexOf("."));
+ return Optional.of(Long.parseLong(withoutExtension));
+ } catch (NumberFormatException e) {
+ LOG.warn("Failed to parse rename timestamp from file {}",
file.getName());
+ return Optional.empty();
}
+ }
- return prefix.toString();
+ /**
+ * Extracts the stable prefix from a file path. The prefix is the first two
underscore-separated
+ * parts (<timestamp>_<servername>), which remain stable across renames. For
example,
+ * "1704153600000_rs1_uuid_renameTs.plog" returns "1704153600000_rs1" and
"1704153600000_rs1.plog"
+ * also returns "1704153600000_rs1".
+ * @param file - The file path to extract prefix from.
+ */
+ protected String getFilePrefix(Path file) {
+ String[] parts = file.getName().split("_");
+ if (parts.length < 2) {
+ return file.getName().split("\\.")[0];
+ }
+ String secondPart = parts[1].contains(".") ? parts[1].split("\\.")[0] :
parts[1];
+ return parts[0] + "_" + secondPart;
}
/**
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
index 412c832305..caacf175ce 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
@@ -36,7 +36,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -90,6 +92,7 @@ public class ReplicationLogDiscoveryTest {
replicationShardDirectoryManager));
fileTracker.init();
+
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
0);
discovery = Mockito.spy(new TestableReplicationLogDiscovery(fileTracker));
Mockito.doReturn(metricsLogDiscovery).when(discovery).getMetrics();
}
@@ -156,6 +159,24 @@ public class ReplicationLogDiscoveryTest {
assertFalse("Discovery should not be running after stop",
discovery.isRunning());
}
+ @Test
+ public void testGetInProgressFileMaxRetries() {
+ // Default value
+ assertEquals("Default maxRetries should be 1",
+ ReplicationLogDiscovery.DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES,
+ discovery.getInProgressFileMaxRetries());
+
+ // Custom value from configuration
+
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY,
5);
+ assertEquals("maxRetries should reflect configured value", 5,
+ discovery.getInProgressFileMaxRetries());
+
+ // Reset to verify it reads from conf each time
+
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY,
3);
+ assertEquals("maxRetries should reflect updated configured value", 3,
+ discovery.getInProgressFileMaxRetries());
+ }
+
/**
* Tests processRound with in-progress directory processing enabled.
Validates that both new files
* and in-progress files are processed correctly.
@@ -215,14 +236,14 @@ public class ReplicationLogDiscoveryTest {
// For in-progress files, they already have format:
{timestamp}_{rs-n}_{UUID}.plog
// Extract prefix before the UUID (everything before the last underscore)
String fileName = file.getName();
- String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
+ String prefix = extractPrefix(fileName);
expectedProcessedFilePrefixes.add(prefix);
}
for (Path file : inProgressFiles0102) {
// For in-progress files, they already have format:
{timestamp}_{rs-n}_{UUID}.plog
// Extract prefix before the UUID (everything before the last underscore)
String fileName = file.getName();
- String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
+ String prefix = extractPrefix(fileName);
expectedProcessedFilePrefixes.add(prefix);
}
@@ -231,12 +252,7 @@ public class ReplicationLogDiscoveryTest {
Set<String> actualProcessedFilePrefixes = new HashSet<>();
for (Path file : processedFiles) {
String fileName = file.getName();
- // Extract prefix before UUID and extension (everything before the last
underscore before
- // .plog)
- // Remove the extension first
- String withoutExtension = fileName.substring(0,
fileName.lastIndexOf("."));
- // Then get everything before the last underscore (which is the UUID)
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
+ String prefix = extractPrefix(fileName);
actualProcessedFilePrefixes.add(prefix);
}
@@ -256,16 +272,14 @@ public class ReplicationLogDiscoveryTest {
}
// For in-progress files
for (Path expectedFile : inProgressFiles0004) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markInProgress(Mockito.argThat(path -> path
- .getName().substring(0,
path.getName().lastIndexOf("_")).equals(expectedPrefix)));
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
for (Path expectedFile : inProgressFiles0102) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markInProgress(Mockito.argThat(path -> path
- .getName().substring(0,
path.getName().lastIndexOf("_")).equals(expectedPrefix)));
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
// Verify that markCompleted was called 7 times (once for each
successfully processed file)
@@ -276,33 +290,19 @@ public class ReplicationLogDiscoveryTest {
for (Path expectedFile : newFilesForRound) {
String expectedPrefix =
expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("."));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
// For in-progress files (they will have updated UUIDs, but same prefix)
for (Path expectedFile : inProgressFiles0004) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
for (Path expectedFile : inProgressFiles0102) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
// Verify that shouldProcessInProgressDirectory was called once
@@ -388,8 +388,7 @@ public class ReplicationLogDiscoveryTest {
Set<String> actualProcessedFilePrefixes = new HashSet<>();
for (Path file : processedFiles) {
String fileName = file.getName();
- String withoutExtension = fileName.substring(0,
fileName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
+ String prefix = extractPrefix(fileName);
actualProcessedFilePrefixes.add(prefix);
}
@@ -413,12 +412,8 @@ public class ReplicationLogDiscoveryTest {
for (Path expectedFile : newFilesForRound) {
String expectedPrefix =
expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("."));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
// Verify that shouldProcessInProgressDirectory was called once
@@ -448,16 +443,14 @@ public class ReplicationLogDiscoveryTest {
// Validate that in-progress files were NOT processed
for (Path unexpectedFile : inProgressFiles0004) {
- String unexpectedPrefix =
- unexpectedFile.getName().substring(0,
unexpectedFile.getName().lastIndexOf("_"));
+ String unexpectedPrefix = extractPrefix(unexpectedFile.getName());
assertFalse(
"Should NOT have processed in-progress file from 00:00:04: " +
unexpectedFile.getName(),
actualProcessedFilePrefixes.contains(unexpectedPrefix));
}
for (Path unexpectedFile : inProgressFiles0102) {
- String unexpectedPrefix =
- unexpectedFile.getName().substring(0,
unexpectedFile.getName().lastIndexOf("_"));
+ String unexpectedPrefix = extractPrefix(unexpectedFile.getName());
assertFalse(
"Should NOT have processed in-progress file from 00:01:02: " +
unexpectedFile.getName(),
actualProcessedFilePrefixes.contains(unexpectedPrefix));
@@ -560,8 +553,7 @@ public class ReplicationLogDiscoveryTest {
Set<String> actualProcessedFilePrefixes = new HashSet<>();
for (Path file : processedFiles) {
String fileName = file.getName();
- String withoutExtension = fileName.substring(0,
fileName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
+ String prefix = extractPrefix(fileName);
actualProcessedFilePrefixes.add(prefix);
}
@@ -585,12 +577,8 @@ public class ReplicationLogDiscoveryTest {
for (Path expectedFile : newFilesForRound) {
String expectedPrefix =
expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("."));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
// Validate that files from other rounds were NOT processed (using prefix
comparison)
@@ -618,16 +606,14 @@ public class ReplicationLogDiscoveryTest {
// Validate that in-progress files were NOT processed
(processNewFilesForRound only processes
// new files)
for (Path unexpectedFile : inProgressFiles0004) {
- String unexpectedPrefix =
- unexpectedFile.getName().substring(0,
unexpectedFile.getName().lastIndexOf("_"));
+ String unexpectedPrefix = extractPrefix(unexpectedFile.getName());
assertFalse(
"Should NOT have processed in-progress file from 00:00:04: " +
unexpectedFile.getName(),
actualProcessedFilePrefixes.contains(unexpectedPrefix));
}
for (Path unexpectedFile : inProgressFiles0102) {
- String unexpectedPrefix =
- unexpectedFile.getName().substring(0,
unexpectedFile.getName().lastIndexOf("_"));
+ String unexpectedPrefix = extractPrefix(unexpectedFile.getName());
assertFalse(
"Should NOT have processed in-progress file from 00:01:02: " +
unexpectedFile.getName(),
actualProcessedFilePrefixes.contains(unexpectedPrefix));
@@ -651,21 +637,11 @@ public class ReplicationLogDiscoveryTest {
String file1Prefix = newFilesForRound.get(1).getName().substring(0,
newFilesForRound.get(1).getName().lastIndexOf("."));
Mockito.doThrow(new IOException("Processing failed for file
1")).when(discovery)
- .processFile(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(file1Prefix);
- }));
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file1Prefix)));
String file3Prefix = newFilesForRound.get(3).getName().substring(0,
newFilesForRound.get(3).getName().lastIndexOf("."));
Mockito.doThrow(new IOException("Processing failed for file
3")).when(discovery)
- .processFile(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(file3Prefix);
- }));
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file3Prefix)));
// Process new files for the round
discovery.processNewFilesForRound(replicationRound);
@@ -686,12 +662,8 @@ public class ReplicationLogDiscoveryTest {
for (Path expectedFile : newFilesForRound) {
String expectedPrefix =
expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("."));
- Mockito.verify(discovery,
Mockito.times(1)).processFile(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ Mockito.verify(discovery, Mockito.times(1))
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
// Verify that markCompleted was called for each successfully processed
file
@@ -700,80 +672,40 @@ public class ReplicationLogDiscoveryTest {
// Verify that markCompleted was called for each successfully processed
file with correct paths
String expectedPrefix0 = newFilesForRound.get(0).getName().substring(0,
newFilesForRound.get(0).getName().lastIndexOf("."));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix0);
- }));
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix0)));
String expectedPrefix2 = newFilesForRound.get(2).getName().substring(0,
newFilesForRound.get(2).getName().lastIndexOf("."));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix2);
- }));
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix2)));
String expectedPrefix4 = newFilesForRound.get(4).getName().substring(0,
newFilesForRound.get(4).getName().lastIndexOf("."));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix4);
- }));
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix4)));
// Verify that markCompleted was NOT called for failed files
String unexpectedPrefix1 = newFilesForRound.get(1).getName().substring(0,
newFilesForRound.get(1).getName().lastIndexOf("."));
- Mockito.verify(fileTracker,
Mockito.never()).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(unexpectedPrefix1);
- }));
+ Mockito.verify(fileTracker, Mockito.never()).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(unexpectedPrefix1)));
String unexpectedPrefix3 = newFilesForRound.get(3).getName().substring(0,
newFilesForRound.get(3).getName().lastIndexOf("."));
- Mockito.verify(fileTracker,
Mockito.never()).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(unexpectedPrefix3);
- }));
+ Mockito.verify(fileTracker, Mockito.never()).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(unexpectedPrefix3)));
// Verify that markFailed was called for failed files
- Mockito.verify(fileTracker,
Mockito.times(1)).markFailed(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(unexpectedPrefix1);
- }));
- Mockito.verify(fileTracker,
Mockito.times(1)).markFailed(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(unexpectedPrefix3);
- }));
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(unexpectedPrefix1)));
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(unexpectedPrefix3)));
// Verify that markFailed was NOT called for successfully processed files
- Mockito.verify(fileTracker,
Mockito.never()).markFailed(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix0);
- }));
- Mockito.verify(fileTracker,
Mockito.never()).markFailed(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix2);
- }));
- Mockito.verify(fileTracker,
Mockito.never()).markFailed(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix4);
- }));
+ Mockito.verify(fileTracker, Mockito.never())
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix0)));
+ Mockito.verify(fileTracker, Mockito.never())
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix2)));
+ Mockito.verify(fileTracker, Mockito.never())
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix4)));
}
/**
@@ -793,12 +725,8 @@ public class ReplicationLogDiscoveryTest {
for (Path file : newFilesForRound) {
String filePrefix = file.getName().substring(0,
file.getName().lastIndexOf("."));
Mockito.doThrow(new IOException("Processing failed for file: " +
file.getName()))
- .when(discovery).processFile(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(filePrefix);
- }));
+ .when(discovery)
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(filePrefix)));
}
// Process new files for the round
@@ -820,12 +748,8 @@ public class ReplicationLogDiscoveryTest {
for (Path expectedFile : newFilesForRound) {
String expectedPrefix =
expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("."));
- Mockito.verify(discovery,
Mockito.times(1)).processFile(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ Mockito.verify(discovery, Mockito.times(1))
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
// Verify that markCompleted was NOT called for any file (all failed)
@@ -838,12 +762,8 @@ public class ReplicationLogDiscoveryTest {
for (Path failedFile : newFilesForRound) {
String expectedPrefix =
failedFile.getName().substring(0,
failedFile.getName().lastIndexOf("."));
- Mockito.verify(fileTracker,
Mockito.times(1)).markFailed(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
}
@@ -853,140 +773,143 @@ public class ReplicationLogDiscoveryTest {
*/
@Test
public void testProcessInProgressDirectory() throws IOException {
- // 1. Create in-progress files for different timestamps
- long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04
- List<Path> inProgressFiles0004 = createInProgressFiles(timestamp0004, 3);
+ // Inject an advancing clock so we can verify rename timestamps are recent
+ long initialTime = 1704153800000L;
+ long originalRenameTimestamp = 1704153604000L;
+ AtomicLong clock = new AtomicLong(initialTime);
+ EnvironmentEdge edge = clock::getAndIncrement;
+ EnvironmentEdgeManager.injectEdge(edge);
- long timestamp0102 = 1704153660000L + (2 * 1000L); // 00:01:02
- List<Path> inProgressFiles0102 = createInProgressFiles(timestamp0102, 2);
+ try {
+ // 1. Create in-progress files for different timestamps with old rename
timestamps
+ long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04
+ List<Path> inProgressFiles0004 =
+ createInProgressFiles(timestamp0004, 3, originalRenameTimestamp);
- long timestamp0206 = 1704153720000L + (6 * 1000L); // 00:02:06
- List<Path> inProgressFiles0206 = createInProgressFiles(timestamp0206, 2);
+ long timestamp0102 = 1704153660000L + (2 * 1000L); // 00:01:02
+ List<Path> inProgressFiles0102 =
+ createInProgressFiles(timestamp0102, 2, originalRenameTimestamp);
- // 2. Create some new files to ensure they are NOT processed
- ReplicationRound replicationRound = new ReplicationRound(1704153600000L,
1704153660000L); // 00:00:00
-
// -
-
// 00:01:00
- List<Path> newFilesForRound = createNewFilesForRound(replicationRound, 3);
+ long timestamp0206 = 1704153720000L + (6 * 1000L); // 00:02:06
+ List<Path> inProgressFiles0206 =
+ createInProgressFiles(timestamp0206, 2, originalRenameTimestamp);
- // Process in-progress directory
- discovery.processInProgressDirectory();
+ // 2. Create some new files to ensure they are NOT processed
+ ReplicationRound replicationRound = new ReplicationRound(1704153600000L,
1704153660000L); // 00:00:00
+
// -
+
// 00:01:00
+ List<Path> newFilesForRound = createNewFilesForRound(replicationRound,
3);
- // 3. Ensure all in-progress files (7 total) are processed
- List<Path> processedFiles = discovery.getProcessedFiles();
- assertEquals("Invalid number of files processed", 7,
processedFiles.size());
+ // Process in-progress directory
+ discovery.processInProgressDirectory();
- // Create set of expected files that should be processed (by prefix, since
UUIDs are updated
- // during markInProgress)
- Set<String> expectedProcessedFilePrefixes = new HashSet<>();
- for (Path file : inProgressFiles0004) {
- String fileName = file.getName();
- String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
- expectedProcessedFilePrefixes.add(prefix);
- }
- for (Path file : inProgressFiles0102) {
- String fileName = file.getName();
- String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
- expectedProcessedFilePrefixes.add(prefix);
- }
- for (Path file : inProgressFiles0206) {
- String fileName = file.getName();
- String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
- expectedProcessedFilePrefixes.add(prefix);
- }
+ // 3. Ensure all in-progress files (7 total) are processed
+ List<Path> processedFiles = discovery.getProcessedFiles();
+ assertEquals("Invalid number of files processed", 7,
processedFiles.size());
- // Create set of actually processed file paths (extract prefixes)
- Set<String> actualProcessedFilePrefixes = new HashSet<>();
- for (Path file : processedFiles) {
- String fileName = file.getName();
- String withoutExtension = fileName.substring(0,
fileName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- actualProcessedFilePrefixes.add(prefix);
- }
+ // Create set of expected files that should be processed (by prefix,
since UUIDs are updated
+ // during markInProgress)
+ Set<String> expectedProcessedFilePrefixes = new HashSet<>();
+ for (Path file : inProgressFiles0004) {
+ String fileName = file.getName();
+ String prefix = extractPrefix(fileName);
+ expectedProcessedFilePrefixes.add(prefix);
+ }
+ for (Path file : inProgressFiles0102) {
+ String fileName = file.getName();
+ String prefix = extractPrefix(fileName);
+ expectedProcessedFilePrefixes.add(prefix);
+ }
+ for (Path file : inProgressFiles0206) {
+ String fileName = file.getName();
+ String prefix = extractPrefix(fileName);
+ expectedProcessedFilePrefixes.add(prefix);
+ }
- // Validate that sets are equal
- assertEquals("Expected and actual processed files should match",
expectedProcessedFilePrefixes,
- actualProcessedFilePrefixes);
+ // Create set of actually processed file paths (extract prefixes)
+ Set<String> actualProcessedFilePrefixes = new HashSet<>();
+ for (Path file : processedFiles) {
+ String fileName = file.getName();
+ String prefix = extractPrefix(fileName);
+ actualProcessedFilePrefixes.add(prefix);
+ }
- // Verify that markInProgress was called 7 times
- Mockito.verify(fileTracker,
Mockito.times(7)).markInProgress(Mockito.any(Path.class));
+ // Validate that sets are equal
+ assertEquals("Expected and actual processed files should match",
+ expectedProcessedFilePrefixes, actualProcessedFilePrefixes);
- // Verify that markInProgress was called for each expected file
- for (Path expectedFile : inProgressFiles0004) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markInProgress(Mockito.argThat(path -> {
- String pathName = path.getName();
- String prefix = pathName.substring(0, pathName.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
- }
- for (Path expectedFile : inProgressFiles0102) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markInProgress(Mockito.argThat(path -> {
- String pathName = path.getName();
- String prefix = pathName.substring(0, pathName.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
- }
- for (Path expectedFile : inProgressFiles0206) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markInProgress(Mockito.argThat(path -> {
- String pathName = path.getName();
- String prefix = pathName.substring(0, pathName.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
- }
+ // Verify that markInProgress was called 7 times
+ Mockito.verify(fileTracker,
Mockito.times(7)).markInProgress(Mockito.any(Path.class));
- // Verify that markCompleted was called for each processed file
- Mockito.verify(fileTracker,
Mockito.times(7)).markCompleted(Mockito.any(Path.class));
+ // Verify that markInProgress was called for each expected file
+ for (Path expectedFile : inProgressFiles0004) {
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
+ }
+ for (Path expectedFile : inProgressFiles0102) {
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
+ }
+ for (Path expectedFile : inProgressFiles0206) {
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
+ }
- // Verify that markCompleted was called for each processed file with
correct paths
- for (Path expectedFile : inProgressFiles0004) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
- }
- for (Path expectedFile : inProgressFiles0102) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
- }
- for (Path expectedFile : inProgressFiles0206) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
- }
+ // Verify that markCompleted was called for each processed file
+ Mockito.verify(fileTracker,
Mockito.times(7)).markCompleted(Mockito.any(Path.class));
- // Validate that new files were NOT processed (processInProgressDirectory
only processes
- // in-progress files)
- for (Path unexpectedFile : newFilesForRound) {
- String unexpectedPrefix =
- unexpectedFile.getName().substring(0,
unexpectedFile.getName().lastIndexOf("."));
- assertFalse("Should NOT have processed new file: " +
unexpectedFile.getName(),
- actualProcessedFilePrefixes.contains(unexpectedPrefix));
+ // Verify that markCompleted was called for each processed file with
correct paths
+ for (Path expectedFile : inProgressFiles0004) {
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
+ }
+ for (Path expectedFile : inProgressFiles0102) {
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
+ }
+ for (Path expectedFile : inProgressFiles0206) {
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
+ }
+
+ // Validate that new files were NOT processed
(processInProgressDirectory only processes
+ // in-progress files)
+ for (Path unexpectedFile : newFilesForRound) {
+ String unexpectedPrefix =
+ unexpectedFile.getName().substring(0,
unexpectedFile.getName().lastIndexOf("."));
+ assertFalse("Should NOT have processed new file: " +
unexpectedFile.getName(),
+ actualProcessedFilePrefixes.contains(unexpectedPrefix));
+ }
+
+ // Verify that all processed files had rename timestamps updated to
recent values
+ // (not the original old rename timestamp)
+ for (Path processedFile : processedFiles) {
+ Optional<Long> renameTs =
fileTracker.getRenameTimestamp(processedFile);
+ assertTrue("Processed file should have a rename timestamp: " +
processedFile.getName(),
+ renameTs.isPresent());
+ assertTrue("Processed file's rename timestamp should be recent (>=
initialTime), but was "
+ + renameTs.get() + " for " + processedFile.getName(), renameTs.get()
>= initialTime);
+ assertTrue(
+ "Processed file's rename timestamp should be newer than the original
("
+ + originalRenameTimestamp + "), but was " + renameTs.get(),
+ renameTs.get() > originalRenameTimestamp);
+ }
+ } finally {
+ EnvironmentEdgeManager.reset();
}
}
@Test
public void testProcessInProgressDirectoryWithIntermittentFailure() throws
IOException {
+ // Allow 2 retries so files that fail once can succeed on retry
+
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY,
2);
+
// Create in-progress files for different timestamps
long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04
List<Path> inProgressFiles0004 = createInProgressFiles(timestamp0004, 3);
@@ -1001,24 +924,14 @@ public class ReplicationLogDiscoveryTest {
// Mock processFile to throw exception for specific files (files 1 and 3)
only on first call,
// succeed on retry
- String file1Prefix = allInProgressFiles.get(1).getName().substring(0,
- allInProgressFiles.get(1).getName().lastIndexOf("_"));
+ String file1Prefix = extractPrefix(allInProgressFiles.get(1).getName());
Mockito.doThrow(new IOException("Processing failed for file
1")).doCallRealMethod()
- .when(discovery).processFile(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(file1Prefix);
- }));
- String file3Prefix = allInProgressFiles.get(3).getName().substring(0,
- allInProgressFiles.get(3).getName().lastIndexOf("_"));
+ .when(discovery)
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file1Prefix)));
+ String file3Prefix = extractPrefix(allInProgressFiles.get(3).getName());
Mockito.doThrow(new IOException("Processing failed for file
3")).doCallRealMethod()
- .when(discovery).processFile(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(file3Prefix);
- }));
+ .when(discovery)
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file3Prefix)));
// Process in-progress directory
discovery.processInProgressDirectory();
@@ -1030,15 +943,10 @@ public class ReplicationLogDiscoveryTest {
// Files 1 and 3 are called twice (initial attempt + retry), others once
for (int i = 0; i < allInProgressFiles.size(); i++) {
Path expectedFile = allInProgressFiles.get(i);
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
+ String expectedPrefix = extractPrefix(expectedFile.getName());
int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are
retried
- Mockito.verify(fileTracker, Mockito.times(expectedTimes))
- .markInProgress(Mockito.argThat(path -> {
- String pathName = path.getName();
- String prefix = pathName.substring(0, pathName.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ Mockito.verify(fileTracker, Mockito.times(expectedTimes)).markInProgress(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
// Verify that processFile was called for each file in the directory (i.e.
5 + 2 times for
@@ -1049,16 +957,11 @@ public class ReplicationLogDiscoveryTest {
// Files 1 and 3 should be called twice (fail once, succeed on retry),
others once
for (int i = 0; i < allInProgressFiles.size(); i++) {
Path expectedFile = allInProgressFiles.get(i);
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
+ String expectedPrefix = extractPrefix(expectedFile.getName());
int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are
called twice (fail +
// retry success)
- Mockito.verify(discovery,
Mockito.times(expectedTimes)).processFile(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ Mockito.verify(discovery, Mockito.times(expectedTimes))
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
// Verify that markCompleted was called for each successfully processed
file
@@ -1068,80 +971,272 @@ public class ReplicationLogDiscoveryTest {
Mockito.verify(fileTracker,
Mockito.times(2)).markFailed(Mockito.any(Path.class));
// Verify that markFailed was called once ONLY for failed files
- String failedPrefix1 = allInProgressFiles.get(1).getName().substring(0,
- allInProgressFiles.get(1).getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markFailed(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(failedPrefix1);
- }));
- String failedPrefix3 = allInProgressFiles.get(3).getName().substring(0,
- allInProgressFiles.get(3).getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markFailed(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(failedPrefix3);
- }));
+ String failedPrefix1 = extractPrefix(allInProgressFiles.get(1).getName());
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(failedPrefix1)));
+ String failedPrefix3 = extractPrefix(allInProgressFiles.get(3).getName());
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(failedPrefix3)));
// Verify that markFailed was NOT called for files processed successfully
in first iteration
- String successPrefix0 = allInProgressFiles.get(0).getName().substring(0,
- allInProgressFiles.get(0).getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.never()).markFailed(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(successPrefix0);
- }));
- String successPrefix2 = allInProgressFiles.get(2).getName().substring(0,
- allInProgressFiles.get(2).getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.never()).markFailed(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(successPrefix2);
- }));
- String successPrefix4 = allInProgressFiles.get(4).getName().substring(0,
- allInProgressFiles.get(4).getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.never()).markFailed(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(successPrefix4);
- }));
+ String successPrefix0 = extractPrefix(allInProgressFiles.get(0).getName());
+ Mockito.verify(fileTracker, Mockito.never())
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(successPrefix0)));
+ String successPrefix2 = extractPrefix(allInProgressFiles.get(2).getName());
+ Mockito.verify(fileTracker, Mockito.never())
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(successPrefix2)));
+ String successPrefix4 = extractPrefix(allInProgressFiles.get(4).getName());
+ Mockito.verify(fileTracker, Mockito.never())
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(successPrefix4)));
// Verify that markCompleted was called for each successfully processed
file with correct paths
for (Path expectedFile : allInProgressFiles) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
+ }
+ }
+
+ /**
+ * Tests that with default maxRetries=1, files that fail processing are
skipped for the rest of
+ * the round and not retried. Verifies that successfully processed files are
completed, while
+ * failed files are only marked as failed without retry.
+ */
+ @Test
+ public void testProcessInProgressDirectorySkipsFailedFiles() throws
IOException {
+ // Use default maxRetries=1 (no explicit config override)
+ // Create in-progress files
+ long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04
+ List<Path> inProgressFiles = createInProgressFiles(timestamp0004, 3);
+
+ // Mock processFile to always throw for file 1 (persistent failure)
+ String file1Prefix = extractPrefix(inProgressFiles.get(1).getName());
+ Mockito.doThrow(new IOException("Persistent failure for file
1")).when(discovery)
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file1Prefix)));
+
+ // Process in-progress directory
+ discovery.processInProgressDirectory();
+
+ // With maxRetries=1, file 1 should fail once and be skipped (no retry)
+ // Files 0 and 2 should succeed
+ // Total markInProgress calls: 3 (one per file, no retries)
+ Mockito.verify(fileTracker,
Mockito.times(3)).markInProgress(Mockito.any(Path.class));
+
+ // processFile called 3 times (all files attempted once)
+ Mockito.verify(discovery,
Mockito.times(3)).processFile(Mockito.any(Path.class));
+
+ // markCompleted called only for the 2 successful files
+ Mockito.verify(fileTracker,
Mockito.times(2)).markCompleted(Mockito.any(Path.class));
+
+ // markFailed called once for the failed file
+ Mockito.verify(fileTracker,
Mockito.times(1)).markFailed(Mockito.any(Path.class));
+
+ // Verify the failed file was NOT retried (processFile called exactly once
for file1)
+ Mockito.verify(discovery, Mockito.times(1))
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file1Prefix)));
+
+ // Verify the successful files were each called exactly once
+ String file0Prefix = extractPrefix(inProgressFiles.get(0).getName());
+ Mockito.verify(discovery, Mockito.times(1))
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file0Prefix)));
+ String file2Prefix = extractPrefix(inProgressFiles.get(2).getName());
+ Mockito.verify(discovery, Mockito.times(1))
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file2Prefix)));
+ }
+
+ /**
+ * Tests that when all in-progress files fail processing, the loop
terminates without retrying any
+ * file (default maxRetries=1). Verifies no infinite loop occurs and all
files are marked as
+ * failed.
+ */
+ @Test
+ public void testProcessInProgressDirectoryAllFilesFail() throws IOException {
+ // Use default maxRetries=1
+ // Create in-progress files
+ long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04
+ List<Path> inProgressFiles = createInProgressFiles(timestamp0004, 3);
+
+ String file0Prefix = extractPrefix(inProgressFiles.get(0).getName());
+ String file1Prefix = extractPrefix(inProgressFiles.get(1).getName());
+ String file2Prefix = extractPrefix(inProgressFiles.get(2).getName());
+
+ // Mock processFile to always throw for all files
+ Mockito.doThrow(new IOException("Persistent failure")).when(discovery)
+ .processFile(Mockito.any(Path.class));
+
+ // Process in-progress directory - should terminate without infinite loop
+ discovery.processInProgressDirectory();
+
+ // Each file attempted exactly once
+ Mockito.verify(fileTracker,
Mockito.times(3)).markInProgress(Mockito.any(Path.class));
+ Mockito.verify(discovery,
Mockito.times(3)).processFile(Mockito.any(Path.class));
+
+ // Verify per-prefix: each file attempted once
+ Mockito.verify(discovery, Mockito.times(1))
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file0Prefix)));
+ Mockito.verify(discovery, Mockito.times(1))
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file1Prefix)));
+ Mockito.verify(discovery, Mockito.times(1))
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file2Prefix)));
+
+ // All files marked as failed
+ Mockito.verify(fileTracker,
Mockito.times(3)).markFailed(Mockito.any(Path.class));
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file0Prefix)));
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file1Prefix)));
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file2Prefix)));
+
+ // No file completed
+ Mockito.verify(fileTracker,
Mockito.never()).markCompleted(Mockito.any(Path.class));
+ }
+
+ /**
+ * Tests that the failure tracking is scoped per invocation of
processInProgressDirectory. Files
+ * that fail in the first invocation are skipped for that round, but get a
fresh chance in the
+ * second invocation where they succeed.
+ */
+ @Test
+ public void testProcessInProgressDirectoryFailedFilesSucceedOnNextRound()
throws IOException {
+ // Use default maxRetries=1
+ // Inject an advancing clock so we can verify rename timestamps
+ long initialTime = 1704153800000L;
+ long originalRenameTimestamp = 1704153604000L;
+ AtomicLong clock = new AtomicLong(initialTime);
+ EnvironmentEdge edge = clock::getAndIncrement;
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ try {
+ // Create in-progress files with old rename timestamps
+ long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04
+ List<Path> inProgressFiles = createInProgressFiles(timestamp0004, 3,
originalRenameTimestamp);
+
+ // Extract prefixes for verification
+ String file0Prefix = extractPrefix(inProgressFiles.get(0).getName());
+ String file1Prefix = extractPrefix(inProgressFiles.get(1).getName());
+ String file2Prefix = extractPrefix(inProgressFiles.get(2).getName());
+
+ // Verify original rename timestamp
+ for (Path file : inProgressFiles) {
+ Optional<Long> ts = fileTracker.getRenameTimestamp(file);
+ assertTrue("Original file should have rename timestamp",
ts.isPresent());
+ assertEquals("Original rename timestamp should match creation value",
+ originalRenameTimestamp, ts.get().longValue());
+ }
+
+ // Mock processFile to fail for file 1 only on the first call, succeed
on subsequent calls
+ Mockito.doThrow(new IOException("Transient failure for file
1")).doCallRealMethod()
+ .when(discovery)
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file1Prefix)));
+
+ // --- First invocation: file 1 fails, files 0 and 2 succeed ---
+ discovery.processInProgressDirectory();
+
+ // Verify markInProgress called once per file (3 total, no retries)
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markInProgress(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file0Prefix)));
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markInProgress(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file1Prefix)));
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markInProgress(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file2Prefix)));
+
+ // Verify processFile called once per file
+ Mockito.verify(discovery, Mockito.times(1))
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file0Prefix)));
+ Mockito.verify(discovery, Mockito.times(1))
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file1Prefix)));
+ Mockito.verify(discovery, Mockito.times(1))
+ .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file2Prefix)));
+
+ // Verify markCompleted called for files 0 and 2 (successful)
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markCompleted(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file0Prefix)));
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markCompleted(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file2Prefix)));
+
+ // Verify markFailed called only for file 1
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file1Prefix)));
+ Mockito.verify(fileTracker, Mockito.never())
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file0Prefix)));
+ Mockito.verify(fileTracker, Mockito.never())
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file2Prefix)));
+
+ // After first round: file 1 should still be in in-progress with an
UPDATED rename timestamp
+ // (not the original old one)
+ Path inProgressDir = fileTracker.getInProgressDirPath();
+ FileStatus[] remainingFiles = localFs.listStatus(inProgressDir);
+ assertEquals("Only file 1 should remain in in-progress after first
round", 1,
+ remainingFiles.length);
+ Path failedFilePath = remainingFiles[0].getPath();
+ assertEquals("Remaining file should have file 1's prefix", file1Prefix,
+ extractPrefix(failedFilePath.getName()));
+ Optional<Long> updatedRenameTs =
fileTracker.getRenameTimestamp(failedFilePath);
+ assertTrue("Failed file should have a rename timestamp",
updatedRenameTs.isPresent());
+ assertTrue("Failed file's rename timestamp should be newer than the
original",
+ updatedRenameTs.get() > originalRenameTimestamp);
+ assertTrue("Failed file's rename timestamp should be recent (>=
initialTime)",
+ updatedRenameTs.get() >= initialTime);
+ long firstRoundRenameTs = updatedRenameTs.get();
+
+ // --- Second invocation: file 1 should be retried and succeed ---
+ Mockito.clearInvocations(fileTracker, discovery);
+
+ discovery.processInProgressDirectory();
+
+ // Verify only file 1 was picked up (files 0 and 2 already completed)
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markInProgress(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file1Prefix)));
+ Mockito.verify(fileTracker, Mockito.never())
+ .markInProgress(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file0Prefix)));
+ Mockito.verify(fileTracker, Mockito.never())
+ .markInProgress(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file2Prefix)));
+
+ // Verify processFile called only for file 1 and it succeeded
+ Mockito.verify(discovery,
Mockito.times(1)).processFile(Mockito.argThat(path -> {
+ if (!extractPrefix(path.getName()).equals(file1Prefix)) {
+ return false;
+ }
+ // Verify the path passed to processFile has a rename timestamp newer
than
+ // the first round's rename timestamp
+ Optional<Long> ts = fileTracker.getRenameTimestamp(path);
+ assertTrue("processFile path should have rename timestamp",
ts.isPresent());
+ assertTrue("Second round rename timestamp should be newer than first
round's",
+ ts.get() > firstRoundRenameTs);
+ return true;
}));
+
+ // Verify markCompleted called for file 1
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markCompleted(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file1Prefix)));
+
+ // Verify markFailed was not called in the second round
+ Mockito.verify(fileTracker,
Mockito.never()).markFailed(Mockito.any(Path.class));
+ } finally {
+ EnvironmentEdgeManager.reset();
}
}
/**
- * Tests processing of in-progress directory when no files meet the
timestamp criteria. Validates
- * that no files are processed when all files are too recent.
+ * Tests processing of in-progress directory when no files meet the rename
timestamp criteria.
+ * Validates that no files are processed when all files were recently
renamed.
*/
@Test
public void testProcessInProgressDirectoryWithNoOldFiles() throws
IOException {
- // Set up current time for consistent testing
+ // Set up current time and override min age to 60 seconds for this test
long currentTime = 1704153660000L; // 00:01:00
+
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
60);
EnvironmentEdge edge = () -> currentTime;
EnvironmentEdgeManager.injectEdge(edge);
try {
- // Create only recent files (all within the threshold)
- long recentTimestamp1 = 1704153655000L; // 00:00:55 (5 seconds old)
- long recentTimestamp2 = 1704153658000L; // 00:00:58 (2 seconds old)
+ // Create files with recent rename timestamps (within the 60-second min
age window)
+ long recentRename1 = currentTime - 5000L; // renamed 5 seconds ago
+ long recentRename2 = currentTime - 2000L; // renamed 2 seconds ago
- List<Path> recentFiles1 = createInProgressFiles(recentTimestamp1, 2);
- List<Path> recentFiles2 = createInProgressFiles(recentTimestamp2, 2);
+ List<Path> recentFiles1 = createInProgressFiles(1704153600000L, 2,
recentRename1);
+ List<Path> recentFiles2 = createInProgressFiles(1704153600000L, 2,
recentRename2);
// Process in-progress directory
discovery.processInProgressDirectory();
@@ -1149,39 +1244,40 @@ public class ReplicationLogDiscoveryTest {
// Get processed files
List<Path> processedFiles = discovery.getProcessedFiles();
- // Verify that no files were processed (all files are too recent)
- assertEquals("Should not process any files when all files are too
recent", 0,
+ // Verify that no files were processed (all rename timestamps are too
recent)
+ assertEquals("Should not process any files when all files were recently
renamed", 0,
processedFiles.size());
} finally {
EnvironmentEdgeManager.reset();
+
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
0);
}
}
/**
- * Tests processing of in-progress directory with timestamp filtering using
- * getOlderInProgressFiles. Validates that only files older than the
calculated threshold are
- * processed, excluding recent files.
+ * Tests processing of in-progress directory with rename timestamp
filtering. Validates that only
+ * files whose rename timestamp is older than the configured minimum age are
processed.
*/
@Test
public void testProcessInProgressDirectoryWithTimestampFiltering() throws
IOException {
- // Set up current time for consistent testing
+ // Set up current time and override min age to 60 seconds for this test
long currentTime = 1704153660000L; // 00:01:00
+
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
60);
EnvironmentEdge edge = () -> currentTime;
EnvironmentEdgeManager.injectEdge(edge);
try {
- // Create files with various ages
- long veryOldTimestamp = 1704153600000L; // 00:00:00 (1 minute old) -
should be processed
- long oldTimestamp = 1704153630000L; // 00:00:30 (30 seconds old) -
should be processed
- long recentTimestamp = 1704153655000L; // 00:00:55 (5 seconds old) -
should NOT be processed
- long veryRecentTimestamp = 1704153658000L; // 00:00:58 (2 seconds old) -
should NOT be
- // processed
-
- List<Path> veryOldFiles = createInProgressFiles(veryOldTimestamp, 1);
- List<Path> oldFiles = createInProgressFiles(oldTimestamp, 1);
- List<Path> recentFiles = createInProgressFiles(recentTimestamp, 1);
- List<Path> veryRecentFiles = createInProgressFiles(veryRecentTimestamp,
1);
+ // threshold = currentTime - 60s = 1704153600000
+ // Files renamed before threshold should be processed
+ long veryOldRename = currentTime - 120000L; // renamed 2 min ago -
should be processed
+ long oldRename = currentTime - 70000L; // renamed 70s ago - should be
processed
+ long recentRename = currentTime - 5000L; // renamed 5s ago - should NOT
be processed
+ long veryRecentRename = currentTime - 2000L; // renamed 2s ago - should
NOT be processed
+
+ List<Path> veryOldFiles = createInProgressFiles(1704153500000L, 1,
veryOldRename);
+ List<Path> oldFiles = createInProgressFiles(1704153500001L, 1,
oldRename);
+ List<Path> recentFiles = createInProgressFiles(1704153500002L, 1,
recentRename);
+ List<Path> veryRecentFiles = createInProgressFiles(1704153500003L, 1,
veryRecentRename);
// Process in-progress directory
discovery.processInProgressDirectory();
@@ -1190,7 +1286,7 @@ public class ReplicationLogDiscoveryTest {
List<Path> processedFiles = discovery.getProcessedFiles();
// Verify that only old files were processed (2 old files, 0 recent
files)
- assertEquals("Should process only old files based on timestamp
filtering", 2,
+ assertEquals("Should process only old files based on rename timestamp
filtering", 2,
processedFiles.size());
// Create set of expected processed files (by prefix, since UUIDs are
updated during
@@ -1198,12 +1294,12 @@ public class ReplicationLogDiscoveryTest {
Set<String> expectedProcessedFilePrefixes = new HashSet<>();
for (Path file : veryOldFiles) {
String fileName = file.getName();
- String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
+ String prefix = extractPrefix(fileName);
expectedProcessedFilePrefixes.add(prefix);
}
for (Path file : oldFiles) {
String fileName = file.getName();
- String prefix = fileName.substring(0, fileName.lastIndexOf("_"));
+ String prefix = extractPrefix(fileName);
expectedProcessedFilePrefixes.add(prefix);
}
@@ -1211,8 +1307,7 @@ public class ReplicationLogDiscoveryTest {
Set<String> actualProcessedFilePrefixes = new HashSet<>();
for (Path file : processedFiles) {
String fileName = file.getName();
- String withoutExtension = fileName.substring(0,
fileName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
+ String prefix = extractPrefix(fileName);
actualProcessedFilePrefixes.add(prefix);
}
@@ -1225,22 +1320,14 @@ public class ReplicationLogDiscoveryTest {
// Verify that markInProgress was called for each expected file
for (Path expectedFile : veryOldFiles) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markInProgress(Mockito.argThat(path -> {
- String pathName = path.getName();
- String prefix = pathName.substring(0, pathName.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
for (Path expectedFile : oldFiles) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markInProgress(Mockito.argThat(path -> {
- String pathName = path.getName();
- String prefix = pathName.substring(0, pathName.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
// Verify that markCompleted was called 2 times
@@ -1248,42 +1335,31 @@ public class ReplicationLogDiscoveryTest {
// Verify that markCompleted was called for each expected file with
correct paths
for (Path expectedFile : veryOldFiles) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
for (Path expectedFile : oldFiles) {
- String expectedPrefix =
- expectedFile.getName().substring(0,
expectedFile.getName().lastIndexOf("_"));
- Mockito.verify(fileTracker,
Mockito.times(1)).markCompleted(Mockito.argThat(path -> {
- String pathName = path.getName();
- String withoutExtension = pathName.substring(0,
pathName.lastIndexOf("."));
- String prefix = withoutExtension.substring(0,
withoutExtension.lastIndexOf("_"));
- return prefix.equals(expectedPrefix);
- }));
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
}
// Verify that recent files were NOT processed
for (Path file : recentFiles) {
- String unexpectedPrefix = file.getName().substring(0,
file.getName().lastIndexOf("_"));
- assertFalse(
- "Recent files should not be processed due to timestamp filtering: "
+ file.getName(),
- actualProcessedFilePrefixes.contains(unexpectedPrefix));
+ String unexpectedPrefix = extractPrefix(file.getName());
+ assertFalse("Recent files should not be processed due to rename
timestamp filtering: "
+ + file.getName(),
actualProcessedFilePrefixes.contains(unexpectedPrefix));
}
for (Path file : veryRecentFiles) {
- String unexpectedPrefix = file.getName().substring(0,
file.getName().lastIndexOf("_"));
- assertFalse(
- "Recent files should not be processed due to timestamp filtering: "
+ file.getName(),
- actualProcessedFilePrefixes.contains(unexpectedPrefix));
+ String unexpectedPrefix = extractPrefix(file.getName());
+ assertFalse("Recent files should not be processed due to rename
timestamp filtering: "
+ + file.getName(),
actualProcessedFilePrefixes.contains(unexpectedPrefix));
}
} finally {
EnvironmentEdgeManager.reset();
+
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
0);
}
}
@@ -1775,14 +1851,32 @@ public class ReplicationLogDiscoveryTest {
return newFiles;
}
+ /**
+ * Extracts the stable prefix (<timestamp>_<server>) from a filename. Works
for all formats:
+ * "ts_server.plog", "ts_server_UUID_renameTs.plog"
+ */
+ private static String extractPrefix(String fileName) {
+ String[] parts = fileName.split("_");
+ if (parts.length < 2) {
+ return fileName.split("\\.")[0];
+ }
+ String secondPart = parts[1].contains(".") ? parts[1].split("\\.")[0] :
parts[1];
+ return parts[0] + "_" + secondPart;
+ }
+
private List<Path> createInProgressFiles(long timestamp, int count) throws
IOException {
- // Create in-progress files
+ return createInProgressFiles(timestamp, count, timestamp);
+ }
+
+ private List<Path> createInProgressFiles(long timestamp, int count, long
renameTimestamp)
+ throws IOException {
Path inProgressDir = fileTracker.getInProgressDirPath();
localFs.mkdirs(inProgressDir);
List<Path> inProgressFiles = new ArrayList<>();
for (int i = 0; i < count; i++) {
String uuid = "12345678-1234-1234-1234-123456789abc" + i;
- Path inProgressFile = new Path(inProgressDir, timestamp + "_rs-" + i +
"_" + uuid + ".plog");
+ Path inProgressFile = new Path(inProgressDir,
+ timestamp + "_rs-" + i + "_" + uuid + "_" + renameTimestamp + ".plog");
localFs.create(inProgressFile, true).close();
inProgressFiles.add(inProgressFile);
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
index 2037b25c0e..6d6d8163a2 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.replication.metrics.MetricsReplicationLogTracker;
import
org.apache.phoenix.replication.metrics.MetricsReplicationLogTrackerReplayImpl;
import org.apache.phoenix.replication.reader.ReplicationLogReplay;
@@ -631,58 +633,76 @@ public class ReplicationLogTrackerTest {
// Initialize tracker
tracker.init();
- // Create a file in a shard directory (without UUID)
- ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
- List<Path> allShardPaths = shardManager.getAllShardPaths();
- Path shardPath = allShardPaths.get(0);
- localFs.mkdirs(shardPath);
-
- // Create original file without UUID
- Path originalFile = new Path(shardPath, "1704153600000_rs1.plog");
- localFs.create(originalFile, true).close();
+ // Inject a fixed time so we can verify the exact rename timestamp
+ long fixedTime = 1704153700000L;
+ EnvironmentEdge edge = () -> fixedTime;
+ EnvironmentEdgeManager.injectEdge(edge);
- // Verify original file exists
- assertTrue("Original file should exist", localFs.exists(originalFile));
-
- // Call markInProgress
- Optional<Path> result = tracker.markInProgress(originalFile);
-
- // Verify file system operation counts
- // markInProgress involves moving file from shard directory to in-progress
directory
- // It should call exists() for only in progress directory (during init),
rename() to move file
- Mockito.verify(mockFs, times(1)).exists(Mockito.any(Path.class));
- Mockito.verify(mockFs,
times(1)).exists(Mockito.eq(tracker.getInProgressDirPath()));
- Mockito.verify(mockFs, times(1)).rename(Mockito.any(Path.class),
Mockito.any(Path.class));
- Mockito.verify(mockFs, times(1)).rename(Mockito.eq(originalFile),
Mockito.any(Path.class));
- // Ensure no listStatus() is called
- Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
-
- // Verify operation was successful
- assertTrue("markInProgress should be successful", result.isPresent());
-
- // Verify original file no longer exists
- assertFalse("Original file should no longer exist",
localFs.exists(originalFile));
+ try {
+ // Create a file in a shard directory (without UUID)
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ List<Path> allShardPaths = shardManager.getAllShardPaths();
+ Path shardPath = allShardPaths.get(0);
+ localFs.mkdirs(shardPath);
- // Verify file was moved to in-progress directory with UUID
- Path inProgressDir = tracker.getInProgressDirPath();
- FileStatus[] files = localFs.listStatus(inProgressDir);
- assertEquals("Should have exactly one file in in-progress directory", 1,
files.length);
-
- // Verify the new file has UUID format and is in in-progress directory
- String newFileName = files[0].getPath().getName();
- assertTrue("New file should have UUID suffix", newFileName.matches(
-
"1704153600000_rs1_[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}\\.plog"));
-
- // Assert that renamed file is in in-progress directory
- Path renamedFile = files[0].getPath();
- assertTrue("Renamed file should be in in-progress directory",
-
renamedFile.getParent().toUri().getPath().equals(tracker.getInProgressDirPath().toString()));
-
- // Assert that renamed file has same prefix as original file
- String originalFileName = originalFile.getName();
- String originalPrefix = originalFileName.substring(0,
originalFileName.lastIndexOf('.'));
- assertTrue("Renamed file should have same prefix as original file",
- newFileName.startsWith(originalPrefix + "_"));
+ // Create original file without UUID
+ Path originalFile = new Path(shardPath, "1704153600000_rs1.plog");
+ localFs.create(originalFile, true).close();
+
+ // Verify original file exists
+ assertTrue("Original file should exist", localFs.exists(originalFile));
+
+ // Call markInProgress
+ Optional<Path> result = tracker.markInProgress(originalFile);
+
+ // Verify file system operation counts
+ // markInProgress involves moving file from shard directory to
in-progress directory
+ // It should call exists() for only in progress directory (during init),
rename() to move file
+ Mockito.verify(mockFs, times(1)).exists(Mockito.any(Path.class));
+ Mockito.verify(mockFs,
times(1)).exists(Mockito.eq(tracker.getInProgressDirPath()));
+ Mockito.verify(mockFs, times(1)).rename(Mockito.any(Path.class),
Mockito.any(Path.class));
+ Mockito.verify(mockFs, times(1)).rename(Mockito.eq(originalFile),
Mockito.any(Path.class));
+ // Ensure no listStatus() is called
+ Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
+
+ // Verify operation was successful
+ assertTrue("markInProgress should be successful", result.isPresent());
+
+ // Verify original file no longer exists
+ assertFalse("Original file should no longer exist",
localFs.exists(originalFile));
+
+ // Verify file was moved to in-progress directory with UUID
+ Path inProgressDir = tracker.getInProgressDirPath();
+ FileStatus[] files = localFs.listStatus(inProgressDir);
+ assertEquals("Should have exactly one file in in-progress directory", 1,
files.length);
+
+ // Verify the new file has UUID and rename timestamp format:
+ // <ts>_<server>_<UUID>_<renameTs>.plog
+ String newFileName = files[0].getPath().getName();
+ assertTrue("New file should have UUID and rename timestamp",
+ newFileName
+
.matches("1704153600000_rs1_[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"
+ + "_[0-9]+\\.plog"));
+
+ // Assert that renamed file is in in-progress directory
+ Path renamedFile = files[0].getPath();
+ assertTrue("Renamed file should be in in-progress directory",
renamedFile.getParent().toUri()
+ .getPath().equals(tracker.getInProgressDirPath().toString()));
+
+ // Assert that renamed file has same prefix as original file
+ String originalFileName = originalFile.getName();
+ String originalPrefix = originalFileName.substring(0,
originalFileName.lastIndexOf('.'));
+ assertTrue("Renamed file should have same prefix as original file",
+ newFileName.startsWith(originalPrefix + "_"));
+
+ // Verify rename timestamp matches the injected current time exactly
+ Optional<Long> renameTimestamp = tracker.getRenameTimestamp(renamedFile);
+ assertTrue("Rename timestamp should be present",
renameTimestamp.isPresent());
+ assertEquals("Rename timestamp should match the current time", fixedTime,
+ renameTimestamp.get().longValue());
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
}
@Test
@@ -690,49 +710,70 @@ public class ReplicationLogTrackerTest {
// Initialize tracker
tracker.init();
- // Create a file in in-progress directory with existing UUID
- Path inProgressDir = tracker.getInProgressDirPath();
- String existingUUID = "12345678-1234-1234-1234-123456789abc";
- Path originalFile = new Path(inProgressDir, "1704153600000_rs1_" +
existingUUID + ".plog");
- localFs.create(originalFile, true).close();
-
- // Verify original file exists
- assertTrue("Original file should exist", localFs.exists(originalFile));
-
- // Call markInProgress
- Optional<Path> result = tracker.markInProgress(originalFile);
-
- // Verify file system operation counts
- // markInProgress involves re-naming file int the in-progress directory
- // It should call exists() for only in progress directory (during init),
rename() to move file
- Mockito.verify(mockFs, times(1)).exists(Mockito.any(Path.class));
- Mockito.verify(mockFs,
times(1)).exists(Mockito.eq(tracker.getInProgressDirPath()));
- Mockito.verify(mockFs, times(1)).rename(Mockito.any(Path.class),
Mockito.any(Path.class));
- Mockito.verify(mockFs, times(1)).rename(Mockito.eq(originalFile),
Mockito.any(Path.class));
- // Ensure no listStatus() is called
- Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
-
- // Verify operation was successful
- assertTrue("markInProgress should be successful", result.isPresent());
-
- // Verify original file no longer exists
- assertFalse("Original file should no longer exist",
localFs.exists(originalFile));
-
- // Verify new file exists in same directory with new UUID
- FileStatus[] files = localFs.listStatus(inProgressDir);
- assertEquals("Should have exactly one file in in-progress directory", 1,
files.length);
-
- // Verify the new file has different UUID
- String newFileName = files[0].getPath().getName();
- assertTrue("New file should have UUID suffix", newFileName.matches(
-
"1704153600000_rs1_[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}\\.plog"));
- assertFalse("New file should have different UUID",
newFileName.contains(existingUUID));
-
- // Assert that renamed file has same prefix as original file
- String originalFileName = originalFile.getName();
- String originalPrefix = originalFileName.substring(0,
originalFileName.lastIndexOf('_'));
- assertTrue("Renamed file should have same prefix as original file",
- newFileName.startsWith(originalPrefix + "_"));
+ // Inject a fixed time that is newer than the existing rename timestamp
+ long existingRenameTimestamp = 1704153660000L;
+ long fixedTime = 1704153800000L;
+ EnvironmentEdge edge = () -> fixedTime;
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ try {
+ // Create a file in in-progress directory with existing UUID and rename
timestamp
+ Path inProgressDir = tracker.getInProgressDirPath();
+ String existingUUID = "12345678-1234-1234-1234-123456789abc";
+ Path originalFile = new Path(inProgressDir,
+ "1704153600000_rs1_" + existingUUID + "_" + existingRenameTimestamp +
".plog");
+ localFs.create(originalFile, true).close();
+
+ // Verify original file exists
+ assertTrue("Original file should exist", localFs.exists(originalFile));
+
+ // Call markInProgress
+ Optional<Path> result = tracker.markInProgress(originalFile);
+
+ // Verify file system operation counts
+ // markInProgress involves re-naming file in the in-progress directory
+ // It should call exists() for only in progress directory (during init),
rename() to move file
+ Mockito.verify(mockFs, times(1)).exists(Mockito.any(Path.class));
+ Mockito.verify(mockFs,
times(1)).exists(Mockito.eq(tracker.getInProgressDirPath()));
+ Mockito.verify(mockFs, times(1)).rename(Mockito.any(Path.class),
Mockito.any(Path.class));
+ Mockito.verify(mockFs, times(1)).rename(Mockito.eq(originalFile),
Mockito.any(Path.class));
+ // Ensure no listStatus() is called
+ Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
+
+ // Verify operation was successful
+ assertTrue("markInProgress should be successful", result.isPresent());
+
+ // Verify original file no longer exists
+ assertFalse("Original file should no longer exist",
localFs.exists(originalFile));
+
+ // Verify new file exists in same directory with new UUID and new rename
timestamp
+ FileStatus[] files = localFs.listStatus(inProgressDir);
+ assertEquals("Should have exactly one file in in-progress directory", 1,
files.length);
+
+ // Verify the new file has different UUID and a rename timestamp:
+ // <ts>_<server>_<UUID>_<renameTs>.plog
+ String newFileName = files[0].getPath().getName();
+ assertTrue("New file should have UUID and rename timestamp",
+ newFileName
+
.matches("1704153600000_rs1_[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"
+ + "_[0-9]+\\.plog"));
+ assertFalse("New file should have different UUID",
newFileName.contains(existingUUID));
+
+ // Assert that renamed file has same prefix (ts_server) as original file
+ String expectedPrefix = "1704153600000_rs1";
+ assertTrue("Renamed file should have same prefix as original file",
+ newFileName.startsWith(expectedPrefix + "_"));
+
+ // Verify rename timestamp is updated to the current (injected) time,
not the old one
+ Optional<Long> renameTimestamp =
tracker.getRenameTimestamp(files[0].getPath());
+ assertTrue("Rename timestamp should be present",
renameTimestamp.isPresent());
+ assertEquals("Rename timestamp should match the current time (not the
old value)", fixedTime,
+ renameTimestamp.get().longValue());
+ assertTrue("Rename timestamp should be newer than the original",
+ renameTimestamp.get() > existingRenameTimestamp);
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
}
@Test
@@ -1090,9 +1131,9 @@ public class ReplicationLogTrackerTest {
Optional<String> newFileUUID = tracker.getFileUUID(newFile);
assertFalse("New file without UUID should return empty Optional",
newFileUUID.isPresent());
- // 2. For in-progress file - with UUID
+ // 2. For in-progress file - with UUID and rename timestamp
Path inProgressFile = new Path(tracker.getInProgressDirPath(),
- "1704153600000_rs1_12345678-1234-1234-1234-123456789abc.plog");
+
"1704153600000_rs1_12345678-1234-1234-1234-123456789abc_1704153660000.plog");
Optional<String> inProgressFileUUID = tracker.getFileUUID(inProgressFile);
assertTrue("In-progress file with UUID should return present Optional",
inProgressFileUUID.isPresent());
@@ -1101,12 +1142,19 @@ public class ReplicationLogTrackerTest {
// Test with different UUID
Path anotherInProgressFile = new Path(tracker.getInProgressDirPath(),
- "1704153600000_rs1_87654321-4321-4321-4321-cba987654321.plog");
+
"1704153600000_rs1_87654321-4321-4321-4321-cba987654321_1704153660000.plog");
Optional<String> anotherUUID = tracker.getFileUUID(anotherInProgressFile);
assertTrue("Another in-progress file with UUID should return present
Optional",
anotherUUID.isPresent());
assertEquals("Another in-progress file UUID should be extracted correctly",
"87654321-4321-4321-4321-cba987654321", anotherUUID.get());
+
+ // 3. Old format without rename timestamp (3 parts) should return empty
+ Path oldFormatFile = new Path(tracker.getInProgressDirPath(),
+ "1704153600000_rs1_12345678-1234-1234-1234-123456789abc.plog");
+ Optional<String> oldFormatUUID = tracker.getFileUUID(oldFormatFile);
+ assertFalse("Old format file without rename timestamp should return empty",
+ oldFormatUUID.isPresent());
}
@Test
@@ -1117,31 +1165,36 @@ public class ReplicationLogTrackerTest {
// Get the in-progress directory path
Path inProgressDir = tracker.getInProgressDirPath();
- // Create files with different timestamps
- long baseTimestamp = 1704153600000L; // 2024-01-02 00:00:00
- long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1); // 1
hour later
+ // Create files with different rename timestamps
+ long baseRenameTimestamp = 1704153600000L; // 2024-01-02 00:00:00
+ long thresholdTimestamp = baseRenameTimestamp +
TimeUnit.HOURS.toMillis(1); // 1 hour later
+ String uuid = "12345678-1234-1234-1234-123456789abc";
- // Files older than threshold (should be returned)
+ // Files with rename timestamps older than threshold (should be returned)
+ long oldRename1 = baseRenameTimestamp + TimeUnit.MINUTES.toMillis(30);
+ long oldRename2 = baseRenameTimestamp + TimeUnit.MINUTES.toMillis(45);
Path oldFile1 =
- new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(30))
+ "_rs1.plog");
+ new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + oldRename1 +
".plog");
Path oldFile2 =
- new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(45))
+ "_rs2.plog");
+ new Path(inProgressDir, "1704153600000_rs2_" + uuid + "_" + oldRename2 +
".plog");
- // Files newer than threshold (should not be returned)
+ // Files with rename timestamps newer than threshold (should not be
returned)
+ long newRename1 = baseRenameTimestamp + TimeUnit.HOURS.toMillis(2);
+ long newRename2 = baseRenameTimestamp + TimeUnit.HOURS.toMillis(3);
Path newFile1 =
- new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(2)) +
"_rs3.plog");
+ new Path(inProgressDir, "1704153600000_rs3_" + uuid + "_" + newRename1 +
".plog");
Path newFile2 =
- new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(3)) +
"_rs4.plog");
+ new Path(inProgressDir, "1704153600000_rs4_" + uuid + "_" + newRename2 +
".plog");
- // Invalid files (should be skipped)
- Path invalidFile = new Path(inProgressDir, "invalid_timestamp_rs5.plog");
+ // Files without rename timestamp (should be skipped)
+ Path noRenameFile = new Path(inProgressDir, "1704153600000_rs5_" + uuid +
".plog");
// Create all files
localFs.create(oldFile1, true).close();
localFs.create(oldFile2, true).close();
localFs.create(newFile1, true).close();
localFs.create(newFile2, true).close();
- localFs.create(invalidFile, true).close();
+ localFs.create(noRenameFile, true).close();
// Call getOlderInProgressFiles
List<Path> result = tracker.getOlderInProgressFiles(thresholdTimestamp);
@@ -1159,7 +1212,8 @@ public class ReplicationLogTrackerTest {
assertTrue("Should contain oldFile2",
resultFilenames.contains(oldFile2.getName()));
assertFalse("Should not contain newFile1",
resultFilenames.contains(newFile1.getName()));
assertFalse("Should not contain newFile2",
resultFilenames.contains(newFile2.getName()));
- assertFalse("Should not contain invalidFile",
resultFilenames.contains(invalidFile.getName()));
+ assertFalse("Should not contain noRenameFile",
+ resultFilenames.contains(noRenameFile.getName()));
}
@Test
@@ -1169,15 +1223,18 @@ public class ReplicationLogTrackerTest {
// Get the in-progress directory path
Path inProgressDir = tracker.getInProgressDirPath();
+ String uuid = "12345678-1234-1234-1234-123456789abc";
- // Create files all newer than threshold
+ // Create files with rename timestamps all newer than threshold
long baseTimestamp = 1704153600000L;
long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1);
+ long newRename1 = baseTimestamp + TimeUnit.HOURS.toMillis(2);
+ long newRename2 = baseTimestamp + TimeUnit.HOURS.toMillis(3);
Path newFile1 =
- new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(2)) +
"_rs1.plog");
+ new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + newRename1 +
".plog");
Path newFile2 =
- new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(3)) +
"_rs2.plog");
+ new Path(inProgressDir, "1704153600000_rs2_" + uuid + "_" + newRename2 +
".plog");
localFs.create(newFile1, true).close();
localFs.create(newFile2, true).close();
@@ -1236,22 +1293,27 @@ public class ReplicationLogTrackerTest {
// Get the in-progress directory path
Path inProgressDir = tracker.getInProgressDirPath();
+ String uuid = "12345678-1234-1234-1234-123456789abc";
long baseTimestamp = 1704153600000L;
long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1);
- // Valid old file (should be returned)
+ // Valid old file with rename timestamp (should be returned)
+ long oldRename = baseTimestamp + TimeUnit.MINUTES.toMillis(30);
Path validOldFile =
- new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(30))
+ "_rs1.plog");
+ new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + oldRename +
".plog");
- // Invalid files (should be skipped)
- Path invalidFile1 = new Path(inProgressDir, "invalid_timestamp_rs2.plog");
- Path invalidFile2 = new Path(inProgressDir, "not_a_timestamp_rs3.plog");
- Path invalidFile3 = new Path(inProgressDir, "1704153600000_rs4.txt"); //
wrong extension
+ // Files without rename timestamp (should be skipped - only 3 parts)
+ Path noRenameFile = new Path(inProgressDir, "1704153600000_rs2_" + uuid +
".plog");
+ // File with non-numeric rename timestamp (should be skipped)
+ Path invalidRenameFile =
+ new Path(inProgressDir, "1704153600000_rs3_" + uuid +
"_notanumber.plog");
+ // Wrong extension (should be skipped by isValidLogFile)
+ Path invalidFile3 = new Path(inProgressDir, "1704153600000_rs4.txt");
localFs.create(validOldFile, true).close();
- localFs.create(invalidFile1, true).close();
- localFs.create(invalidFile2, true).close();
+ localFs.create(noRenameFile, true).close();
+ localFs.create(invalidRenameFile, true).close();
localFs.create(invalidFile3, true).close();
// Call getOlderInProgressFiles
@@ -1264,10 +1326,10 @@ public class ReplicationLogTrackerTest {
Set<String> resultFilenames =
result.stream().map(Path::getName).collect(Collectors.toSet());
assertTrue("Should contain validOldFile",
resultFilenames.contains(validOldFile.getName()));
- assertFalse("Should not contain invalidFile1",
- resultFilenames.contains(invalidFile1.getName()));
- assertFalse("Should not contain invalidFile2",
- resultFilenames.contains(invalidFile2.getName()));
+ assertFalse("Should not contain noRenameFile",
+ resultFilenames.contains(noRenameFile.getName()));
+ assertFalse("Should not contain invalidRenameFile",
+ resultFilenames.contains(invalidRenameFile.getName()));
assertFalse("Should not contain invalidFile3",
resultFilenames.contains(invalidFile3.getName()));
}
@@ -1279,16 +1341,18 @@ public class ReplicationLogTrackerTest {
// Get the in-progress directory path
Path inProgressDir = tracker.getInProgressDirPath();
+ String uuid = "12345678-1234-1234-1234-123456789abc";
long baseTimestamp = 1704153600000L;
long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1);
- // File with timestamp exactly at threshold (should NOT be returned - we
want older than
- // threshold)
- Path fileAtThreshold = new Path(inProgressDir, thresholdTimestamp +
"_rs1.plog");
+ // File with rename timestamp exactly at threshold (should NOT be returned)
+ Path fileAtThreshold =
+ new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" +
thresholdTimestamp + ".plog");
- // File with timestamp just before threshold (should be returned)
- Path fileJustBeforeThreshold = new Path(inProgressDir, (thresholdTimestamp
- 1) + "_rs2.plog");
+ // File with rename timestamp just before threshold (should be returned)
+ Path fileJustBeforeThreshold = new Path(inProgressDir,
+ "1704153600000_rs2_" + uuid + "_" + (thresholdTimestamp - 1) + ".plog");
localFs.create(fileAtThreshold, true).close();
localFs.create(fileJustBeforeThreshold, true).close();
@@ -1315,30 +1379,36 @@ public class ReplicationLogTrackerTest {
// Get the in-progress directory path
Path inProgressDir = tracker.getInProgressDirPath();
+ String uuid = "12345678-1234-1234-1234-123456789abc";
long baseTimestamp = 1704153600000L;
long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1);
- // Valid old files (should be returned)
+ // Valid old files with rename timestamps before threshold (should be
returned)
+ long oldRename1 = baseTimestamp + TimeUnit.MINUTES.toMillis(30);
+ long oldRename2 = baseTimestamp + TimeUnit.MINUTES.toMillis(45);
Path oldFile1 =
- new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(30))
+ "_rs1.plog");
+ new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + oldRename1 +
".plog");
Path oldFile2 =
- new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(45))
+ "_rs2.plog");
+ new Path(inProgressDir, "1704153600000_rs2_" + uuid + "_" + oldRename2 +
".plog");
- // Valid new files (should not be returned)
+ // Valid file with rename timestamp after threshold (should not be
returned)
+ long newRename1 = baseTimestamp + TimeUnit.HOURS.toMillis(2);
Path newFile1 =
- new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(2)) +
"_rs3.plog");
+ new Path(inProgressDir, "1704153600000_rs3_" + uuid + "_" + newRename1 +
".plog");
- // Invalid files (should be skipped)
- Path invalidFile1 = new Path(inProgressDir, "invalid_timestamp_rs4.plog");
- Path invalidFile2 = new Path(inProgressDir, "1704153600000_rs5.txt"); //
wrong extension
- Path invalidFile3 = new Path(inProgressDir, "not_a_number_rs6.plog");
+ // Files without rename timestamp (should be skipped - only 3 parts)
+ Path noRenameFile = new Path(inProgressDir, "1704153600000_rs4_" + uuid +
".plog");
+ // Wrong extension (should be skipped by isValidLogFile)
+ Path invalidFile2 = new Path(inProgressDir, "1704153600000_rs5.txt");
+ // Non-numeric rename timestamp (should be skipped)
+ Path invalidFile3 = new Path(inProgressDir, "1704153600000_rs6_" + uuid +
"_notanumber.plog");
// Create all files
localFs.create(oldFile1, true).close();
localFs.create(oldFile2, true).close();
localFs.create(newFile1, true).close();
- localFs.create(invalidFile1, true).close();
+ localFs.create(noRenameFile, true).close();
localFs.create(invalidFile2, true).close();
localFs.create(invalidFile3, true).close();
@@ -1354,8 +1424,8 @@ public class ReplicationLogTrackerTest {
assertTrue("Should contain oldFile1",
resultFilenames.contains(oldFile1.getName()));
assertTrue("Should contain oldFile2",
resultFilenames.contains(oldFile2.getName()));
assertFalse("Should not contain newFile1",
resultFilenames.contains(newFile1.getName()));
- assertFalse("Should not contain invalidFile1",
- resultFilenames.contains(invalidFile1.getName()));
+ assertFalse("Should not contain noRenameFile",
+ resultFilenames.contains(noRenameFile.getName()));
assertFalse("Should not contain invalidFile2",
resultFilenames.contains(invalidFile2.getName()));
assertFalse("Should not contain invalidFile3",