This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 5a9e2d50c9 PHOENIX-7786 Improved handling for empty files in 
Replication Log Processor (#2392)
5a9e2d50c9 is described below

commit 5a9e2d50c9860361cfe05cb7813f6496514670d0
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Thu Mar 26 01:26:14 2026 +0530

    PHOENIX-7786 Improved handling for empty files in Replication Log Processor 
(#2392)
---
 .../reader/ReplicationLogProcessor.java            |  51 +++++-----
 .../reader/ReplicationLogProcessorTestIT.java      | 108 +++++++++++++++++++--
 2 files changed, 127 insertions(+), 32 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
index 84d4f1ba42..432cf9189d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -238,7 +238,7 @@ public class ReplicationLogProcessor implements Closeable {
 
       if (!logFileReaderOptional.isPresent()) {
         // This is an empty file, assume processed successfully and return
-        LOG.warn("Found empty file to process {}", filePath);
+        LOG.info("Ignoring zero length replication log file {}", filePath);
         return;
       }
 
@@ -303,32 +303,35 @@ public class ReplicationLogProcessor implements Closeable 
{
     LogFileReader logFileReader = new LogFileReader();
     LogFileReaderContext logFileReaderContext =
       new LogFileReaderContext(conf).setFileSystem(fs).setFilePath(filePath);
-    boolean isClosed = isFileClosed(fs, filePath);
-    if (isClosed) {
-      // As file is closed, ensure that the file has a valid header and trailer
-      logFileReader.init(logFileReaderContext);
-      return Optional.of(logFileReader);
-    } else {
-      LOG.warn("Found un-closed file {}. Starting lease recovery.", filePath);
+    try {
+      // Ensure to recover lease first, in case file was un-closed. If it was 
already closed,
+      // recoverLease would return true immediately.
       recoverLease(fs, filePath);
-      if (fs.getFileStatus(filePath).getLen() <= 0) {
-        // Found empty file, returning null LogReader
+      if (fs.getFileStatus(filePath).getLen() > 0) {
+        try {
+          // Acquired the lease, try to create reader with validation both 
header and trailer
+          logFileReader.init(logFileReaderContext);
+          return Optional.of(logFileReader);
+        } catch (InvalidLogTrailerException invalidLogTrailerException) {
+          // If trailer is missing or corrupt, create a new reader without 
trailer validation.
+          // We must create a new instance because close() sets the closed 
flag, making
+          // the old instance unusable for reading.
+          LOG.warn("Invalid Trailer for file {}", filePath, 
invalidLogTrailerException);
+          closeReader(logFileReader);
+          logFileReaderContext.setValidateTrailer(false);
+          logFileReader = new LogFileReader();
+          logFileReader.init(logFileReaderContext);
+          return Optional.of(logFileReader);
+        }
+      } else {
+        // Ignore the file and returning empty LogReader.
         return Optional.empty();
       }
-      try {
-        // Acquired the lease, try to create reader with validation both 
header and trailer
-        logFileReader.init(logFileReaderContext);
-        return Optional.of(logFileReader);
-      } catch (InvalidLogTrailerException invalidLogTrailerException) {
-        // If trailer is missing or corrupt, create reader without trailer 
validation
-        LOG.warn("Invalid Trailer for file {}", filePath, 
invalidLogTrailerException);
-        logFileReaderContext.setValidateTrailer(false);
-        logFileReader.init(logFileReaderContext);
-        return Optional.of(logFileReader);
-      } catch (IOException exception) {
-        LOG.error("Failed to initialize new LogFileReader for path {}", 
filePath, exception);
-        throw exception;
-      }
+    } catch (IOException exception) {
+      LOG.error("Failed to initialize new LogFileReader for path {}", 
filePath, exception);
+      // close the reader to avoid leaking socket connection
+      closeReader(logFileReader);
+      throw exception;
     }
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
index 8a985f3ead..e486f82935 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -69,6 +69,7 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.log.LogFile;
 import org.apache.phoenix.replication.log.LogFileReader;
 import org.apache.phoenix.replication.log.LogFileReaderContext;
 import org.apache.phoenix.replication.log.LogFileTestUtil;
@@ -182,27 +183,118 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
   }
 
   /**
-   * Tests error handling when attempting to create LogFileReader with an 
invalid/corrupted file.
+   * Tests that createLogFileReader returns empty for a zero-byte file.
    */
   @Test
-  public void testCreateLogFileReaderWithInvalidLogFile() throws IOException {
+  public void testCreateLogFileReaderWithEmptyLogFile() throws IOException {
     Path invalidFilePath = new 
Path(testFolder.newFile("invalid_file").toURI());
     localFs.create(invalidFilePath).close(); // Create empty file
     ReplicationLogProcessor replicationLogProcessor =
       new ReplicationLogProcessor(conf, testHAGroupName);
     try {
-      replicationLogProcessor.createLogFileReader(localFs, invalidFilePath);
-      fail("Should throw IOException for invalid file");
-    } catch (IOException e) {
-      // Should throw some kind of IOException when trying to read header
-      assertTrue("Should throw IOException", true);
+      Optional<LogFileReader> optionalLogFileReader =
+        replicationLogProcessor.createLogFileReader(localFs, invalidFilePath);
+      assertFalse("Reader should not be present for empty file", 
optionalLogFileReader.isPresent());
     } finally {
-      // Delete the invalid file
       localFs.delete(invalidFilePath);
       replicationLogProcessor.close();
     }
   }
 
+  /**
+   * Tests that createLogFileReader handles InvalidLogTrailerException by 
closing the old reader and
+   * returning a new usable reader. Simulates a writer crash after sync but 
before close, resulting
+   * in a file with valid header and data but no trailer.
+   */
+  @Test
+  public void testCreateLogFileReaderWithMissingTrailer() throws IOException {
+    Path filePath = new 
Path(testFolder.newFile("missing_trailer_file").toURI());
+    String tableName = "T_" + generateUniqueName();
+
+    LogFileWriter writer = initLogFileWriter(filePath);
+    Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+    writer.append(tableName, 1, put);
+    writer.sync();
+    // Do NOT call writer.close() -- skips trailer, simulates a writer crash 
after sync
+
+    assertTrue("File should exist", localFs.exists(filePath));
+    assertTrue("File should be non-empty", 
localFs.getFileStatus(filePath).getLen() > 0);
+
+    ReplicationLogProcessor spyProcessor =
+      Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName));
+    try {
+      Optional<LogFileReader> optionalReader = 
spyProcessor.createLogFileReader(localFs, filePath);
+      assertTrue("Reader should be present for file with missing trailer",
+        optionalReader.isPresent());
+
+      // Verify the old reader was closed during InvalidLogTrailerException 
handling
+      Mockito.verify(spyProcessor, 
Mockito.times(1)).closeReader(Mockito.any(LogFileReader.class));
+
+      // Verify the returned reader is functional (not stuck with closed=true)
+      LogFileReader reader = optionalReader.get();
+      Iterator<LogFile.Record> iterator = reader.iterator();
+      assertTrue("Reader should be able to read records", iterator.hasNext());
+
+      LogFile.Record record = iterator.next();
+      assertNotNull("Record should not be null", record);
+      assertEquals("Table name should match", tableName, 
record.getHBaseTableName());
+
+      assertFalse("Should have no more records after the single appended 
record",
+        iterator.hasNext());
+      reader.close();
+    } finally {
+      localFs.delete(filePath);
+      spyProcessor.close();
+    }
+  }
+
+  /**
+   * Tests that when the first init() throws InvalidLogTrailerException and 
the second init()
+   * (without trailer validation) throws IOException, closeReader is called 
for both reader
+   * instances and the IOException propagates to the caller.
+   */
+  @Test
+  public void testCreateLogFileReaderWithTrailerExceptionThenIOException() 
throws IOException {
+    // Create a file with valid header + data but no trailer to trigger 
InvalidLogTrailerException
+    Path filePath = new 
Path(testFolder.newFile("trailer_then_io_failure").toURI());
+    String tableName = "T_" + generateUniqueName();
+
+    LogFileWriter writer = initLogFileWriter(filePath);
+    Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+    writer.append(tableName, 1, put);
+    writer.sync();
+    // Do NOT call writer.close() -- skips trailer
+
+    assertTrue("File should be non-empty", 
localFs.getFileStatus(filePath).getLen() > 0);
+
+    ReplicationLogProcessor spyProcessor =
+      Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName));
+
+    // After closeReader is called for the first reader 
(InvalidLogTrailerException),
+    // corrupt the file with garbage so the second init() fails with 
IOException
+    Mockito.doAnswer(invocation -> {
+      invocation.callRealMethod();
+      org.apache.hadoop.fs.FSDataOutputStream out = localFs.create(filePath, 
true);
+      out.write("garbage data that is not a valid log file header".getBytes());
+      out.close();
+      return null;
+    
}).doCallRealMethod().when(spyProcessor).closeReader(Mockito.any(LogFileReader.class));
+
+    try {
+      spyProcessor.createLogFileReader(localFs, filePath);
+      fail("Should throw IOException when second init() fails");
+    } catch (IOException e) {
+      // Expected: second init() fails because the file content is now 
corrupted
+    } finally {
+      localFs.delete(filePath);
+      spyProcessor.close();
+    }
+
+    // closeReader called twice: once for old reader 
(InvalidLogTrailerException),
+    // once for new reader (IOException from outer catch)
+    Mockito.verify(spyProcessor, 
Mockito.times(2)).closeReader(Mockito.any(LogFileReader.class));
+  }
+
   /**
    * Tests the closeReader method with both null and valid LogFileReader 
instances.
    */

Reply via email to