This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 5a9e2d50c9 PHOENIX-7786 Improved handling for empty files in
Replication Log Processor (#2392)
5a9e2d50c9 is described below
commit 5a9e2d50c9860361cfe05cb7813f6496514670d0
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Thu Mar 26 01:26:14 2026 +0530
PHOENIX-7786 Improved handling for empty files in Replication Log Processor
(#2392)
---
.../reader/ReplicationLogProcessor.java | 51 +++++-----
.../reader/ReplicationLogProcessorTestIT.java | 108 +++++++++++++++++++--
2 files changed, 127 insertions(+), 32 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
index 84d4f1ba42..432cf9189d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -238,7 +238,7 @@ public class ReplicationLogProcessor implements Closeable {
if (!logFileReaderOptional.isPresent()) {
// This is an empty file, assume processed successfully and return
- LOG.warn("Found empty file to process {}", filePath);
+ LOG.info("Ignoring zero length replication log file {}", filePath);
return;
}
@@ -303,32 +303,35 @@ public class ReplicationLogProcessor implements Closeable
{
LogFileReader logFileReader = new LogFileReader();
LogFileReaderContext logFileReaderContext =
new LogFileReaderContext(conf).setFileSystem(fs).setFilePath(filePath);
- boolean isClosed = isFileClosed(fs, filePath);
- if (isClosed) {
- // As file is closed, ensure that the file has a valid header and trailer
- logFileReader.init(logFileReaderContext);
- return Optional.of(logFileReader);
- } else {
- LOG.warn("Found un-closed file {}. Starting lease recovery.", filePath);
+ try {
+ // Ensure to recover lease first, in case file was un-closed. If it was
already closed,
+ // recoverLease would return true immediately.
recoverLease(fs, filePath);
- if (fs.getFileStatus(filePath).getLen() <= 0) {
- // Found empty file, returning null LogReader
+ if (fs.getFileStatus(filePath).getLen() > 0) {
+ try {
+ // Acquired the lease, try to create reader with validation both
header and trailer
+ logFileReader.init(logFileReaderContext);
+ return Optional.of(logFileReader);
+ } catch (InvalidLogTrailerException invalidLogTrailerException) {
+ // If trailer is missing or corrupt, create a new reader without
trailer validation.
+ // We must create a new instance because close() sets the closed
flag, making
+ // the old instance unusable for reading.
+ LOG.warn("Invalid Trailer for file {}", filePath,
invalidLogTrailerException);
+ closeReader(logFileReader);
+ logFileReaderContext.setValidateTrailer(false);
+ logFileReader = new LogFileReader();
+ logFileReader.init(logFileReaderContext);
+ return Optional.of(logFileReader);
+ }
+ } else {
+ // Ignore the file and returning empty LogReader.
return Optional.empty();
}
- try {
- // Acquired the lease, try to create reader with validation both
header and trailer
- logFileReader.init(logFileReaderContext);
- return Optional.of(logFileReader);
- } catch (InvalidLogTrailerException invalidLogTrailerException) {
- // If trailer is missing or corrupt, create reader without trailer
validation
- LOG.warn("Invalid Trailer for file {}", filePath,
invalidLogTrailerException);
- logFileReaderContext.setValidateTrailer(false);
- logFileReader.init(logFileReaderContext);
- return Optional.of(logFileReader);
- } catch (IOException exception) {
- LOG.error("Failed to initialize new LogFileReader for path {}",
filePath, exception);
- throw exception;
- }
+ } catch (IOException exception) {
+ LOG.error("Failed to initialize new LogFileReader for path {}",
filePath, exception);
+ // close the reader to avoid leaking socket connection
+ closeReader(logFileReader);
+ throw exception;
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
index 8a985f3ead..e486f82935 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -69,6 +69,7 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.log.LogFile;
import org.apache.phoenix.replication.log.LogFileReader;
import org.apache.phoenix.replication.log.LogFileReaderContext;
import org.apache.phoenix.replication.log.LogFileTestUtil;
@@ -182,27 +183,118 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
}
/**
- * Tests error handling when attempting to create LogFileReader with an
invalid/corrupted file.
+ * Tests that createLogFileReader returns empty for a zero-byte file.
*/
@Test
- public void testCreateLogFileReaderWithInvalidLogFile() throws IOException {
+ public void testCreateLogFileReaderWithEmptyLogFile() throws IOException {
Path invalidFilePath = new
Path(testFolder.newFile("invalid_file").toURI());
localFs.create(invalidFilePath).close(); // Create empty file
ReplicationLogProcessor replicationLogProcessor =
new ReplicationLogProcessor(conf, testHAGroupName);
try {
- replicationLogProcessor.createLogFileReader(localFs, invalidFilePath);
- fail("Should throw IOException for invalid file");
- } catch (IOException e) {
- // Should throw some kind of IOException when trying to read header
- assertTrue("Should throw IOException", true);
+ Optional<LogFileReader> optionalLogFileReader =
+ replicationLogProcessor.createLogFileReader(localFs, invalidFilePath);
+ assertFalse("Reader should not be present for empty file",
optionalLogFileReader.isPresent());
} finally {
- // Delete the invalid file
localFs.delete(invalidFilePath);
replicationLogProcessor.close();
}
}
+ /**
+ * Tests that createLogFileReader handles InvalidLogTrailerException by
closing the old reader and
+ * returning a new usable reader. Simulates a writer crash after sync but
before close, resulting
+ * in a file with valid header and data but no trailer.
+ */
+ @Test
+ public void testCreateLogFileReaderWithMissingTrailer() throws IOException {
+ Path filePath = new
Path(testFolder.newFile("missing_trailer_file").toURI());
+ String tableName = "T_" + generateUniqueName();
+
+ LogFileWriter writer = initLogFileWriter(filePath);
+ Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+ writer.append(tableName, 1, put);
+ writer.sync();
+ // Do NOT call writer.close() -- skips trailer, simulates a writer crash
after sync
+
+ assertTrue("File should exist", localFs.exists(filePath));
+ assertTrue("File should be non-empty",
localFs.getFileStatus(filePath).getLen() > 0);
+
+ ReplicationLogProcessor spyProcessor =
+ Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName));
+ try {
+ Optional<LogFileReader> optionalReader =
spyProcessor.createLogFileReader(localFs, filePath);
+ assertTrue("Reader should be present for file with missing trailer",
+ optionalReader.isPresent());
+
+ // Verify the old reader was closed during InvalidLogTrailerException
handling
+ Mockito.verify(spyProcessor,
Mockito.times(1)).closeReader(Mockito.any(LogFileReader.class));
+
+ // Verify the returned reader is functional (not stuck with closed=true)
+ LogFileReader reader = optionalReader.get();
+ Iterator<LogFile.Record> iterator = reader.iterator();
+ assertTrue("Reader should be able to read records", iterator.hasNext());
+
+ LogFile.Record record = iterator.next();
+ assertNotNull("Record should not be null", record);
+ assertEquals("Table name should match", tableName,
record.getHBaseTableName());
+
+ assertFalse("Should have no more records after the single appended
record",
+ iterator.hasNext());
+ reader.close();
+ } finally {
+ localFs.delete(filePath);
+ spyProcessor.close();
+ }
+ }
+
+ /**
+ * Tests that when the first init() throws InvalidLogTrailerException and
the second init()
+ * (without trailer validation) throws IOException, closeReader is called
for both reader
+ * instances and the IOException propagates to the caller.
+ */
+ @Test
+ public void testCreateLogFileReaderWithTrailerExceptionThenIOException()
throws IOException {
+ // Create a file with valid header + data but no trailer to trigger
InvalidLogTrailerException
+ Path filePath = new
Path(testFolder.newFile("trailer_then_io_failure").toURI());
+ String tableName = "T_" + generateUniqueName();
+
+ LogFileWriter writer = initLogFileWriter(filePath);
+ Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+ writer.append(tableName, 1, put);
+ writer.sync();
+ // Do NOT call writer.close() -- skips trailer
+
+ assertTrue("File should be non-empty",
localFs.getFileStatus(filePath).getLen() > 0);
+
+ ReplicationLogProcessor spyProcessor =
+ Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName));
+
+ // After closeReader is called for the first reader
(InvalidLogTrailerException),
+ // corrupt the file with garbage so the second init() fails with
IOException
+ Mockito.doAnswer(invocation -> {
+ invocation.callRealMethod();
+ org.apache.hadoop.fs.FSDataOutputStream out = localFs.create(filePath,
true);
+ out.write("garbage data that is not a valid log file header".getBytes());
+ out.close();
+ return null;
+
}).doCallRealMethod().when(spyProcessor).closeReader(Mockito.any(LogFileReader.class));
+
+ try {
+ spyProcessor.createLogFileReader(localFs, filePath);
+ fail("Should throw IOException when second init() fails");
+ } catch (IOException e) {
+ // Expected: second init() fails because the file content is now
corrupted
+ } finally {
+ localFs.delete(filePath);
+ spyProcessor.close();
+ }
+
+ // closeReader called twice: once for old reader
(InvalidLogTrailerException),
+ // once for new reader (IOException from outer catch)
+ Mockito.verify(spyProcessor,
Mockito.times(2)).closeReader(Mockito.any(LogFileReader.class));
+ }
+
/**
* Tests the closeReader method with both null and valid LogFileReader
instances.
*/