This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 5779334797 PHOENIX-7865 Set explicit HDFS block size on replication 
log files (#2488)
5779334797 is described below

commit 57793347975266f33bc354e1ff1fa3847d9ba37a
Author: tkhurana <[email protected]>
AuthorDate: Fri May 22 10:44:09 2026 -0700

    PHOENIX-7865 Set explicit HDFS block size on replication log files (#2488)
---
 .../apache/phoenix/replication/ReplicationLog.java | 13 ++++-----
 .../phoenix/replication/ReplicationLogGroup.java   |  3 ++
 .../phoenix/replication/log/LogFileWriter.java     |  3 +-
 .../replication/log/LogFileWriterContext.java      | 13 ++++++++-
 .../phoenix/replication/log/LogFileTestUtil.java   | 34 ++++++++++++++++++++++
 .../replication/log/LogFileWriterSyncTest.java     |  6 ++--
 6 files changed, 58 insertions(+), 14 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index 44d4972e75..3949b3ef2b 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -68,6 +68,7 @@ public class ReplicationLog {
   protected final ReplicationLogGroup logGroup;
   protected final long rotationTimeMs;
   protected final long rotationSizeBytes;
+  protected final long fsBlockSize;
   protected final int maxRotationRetries;
   protected final Compression.Algorithm compression;
   protected final int maxAttempts;
@@ -109,13 +110,9 @@ public class ReplicationLog {
     double rotationSizePercent =
       
conf.getDouble(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
         ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
-    long blockSize = shardManager.getFileSystem().getDefaultBlockSize();
-    if (configuredRotationSize > blockSize) {
-      LOG.warn("Configured rotation size {} exceeds HDFS block size {}; 
clamping to block size",
-        configuredRotationSize, blockSize);
-    }
-    long effectiveRotationSize = Math.min(configuredRotationSize, blockSize);
-    this.rotationSizeBytes = (long) (effectiveRotationSize * 
rotationSizePercent);
+    this.rotationSizeBytes = (long) (configuredRotationSize * 
rotationSizePercent);
+    this.fsBlockSize = 
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_FS_BLOCK_SIZE_BYTES_KEY,
+      ReplicationLogGroup.DEFAULT_REPLICATION_LOG_FS_BLOCK_SIZE_BYTES);
     this.maxRotationRetries = 
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_ROTATION_RETRIES_KEY,
       ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES);
     String compressionName = 
conf.get(ReplicationLogGroup.REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
@@ -150,7 +147,7 @@ public class ReplicationLog {
       logGroup.getServerName().getServerName());
     LogFileWriterContext writerContext = new 
LogFileWriterContext(logGroup.getConfiguration())
       
.setFileSystem(replicationShardDirectoryManager.getFileSystem()).setFilePath(filePath)
-      .setCompression(compression);
+      .setCompression(compression).setFsBlockSize(fsBlockSize);
     LogFileWriter newWriter = new LogFileWriter();
     newWriter.init(writerContext);
     newWriter.setGeneration(writerGeneration.incrementAndGet());
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index e7e5a69a3b..c1e531461a 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -151,6 +151,9 @@ public class ReplicationLogGroup {
   public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
     "phoenix.replication.log.rotation.size.percentage";
   public static final double DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE 
= 0.95;
+  public static final String REPLICATION_LOG_FS_BLOCK_SIZE_BYTES_KEY =
+    "phoenix.replication.log.fs.block.size.bytes";
+  public static final long DEFAULT_REPLICATION_LOG_FS_BLOCK_SIZE_BYTES = 256 * 
1024 * 1024L;
   public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
     "phoenix.replication.log.compression";
   public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM = 
"NONE";
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index 7f5ba64084..dc0cb811f1 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -68,7 +68,8 @@ public class LogFileWriter implements LogFile.Writer {
     // TODO: Handle stream creation with proper permissions and overwrite 
options based on
     // config. For now we overwrite.
     FileSystem fs = context.getFileSystem();
-    FSDataOutputStream out = fs.create(context.getFilePath(), true);
+    FSDataOutputStream out = 
fs.createFile(context.getFilePath()).overwrite(true)
+      .blockSize(context.getFsBlockSize()).build();
     this.writer.init(context, new HDFSDataOutput(out, context.getUseHsync()));
     LOG.debug("Initialized LogFileWriter for path {}", context.getFilePath());
   }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriterContext.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriterContext.java
index 396004c2f1..fdbe916c29 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriterContext.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriterContext.java
@@ -50,6 +50,7 @@ public class LogFileWriterContext {
   private Compression.Algorithm compression;
   private LogFileCodec codec;
   private long maxBlockSize;
+  private long fsBlockSize;
   private boolean useHsync;
 
   public LogFileWriterContext(Configuration conf) {
@@ -122,6 +123,15 @@ public class LogFileWriterContext {
     return this;
   }
 
+  public long getFsBlockSize() {
+    return fsBlockSize;
+  }
+
+  public LogFileWriterContext setFsBlockSize(long fsBlockSize) {
+    this.fsBlockSize = fsBlockSize;
+    return this;
+  }
+
   public boolean getUseHsync() {
     return useHsync;
   }
@@ -134,7 +144,8 @@ public class LogFileWriterContext {
   @Override
   public String toString() {
     return "LogFileWriterContext [path=" + path + ", compression=" + 
compression + ", codec="
-      + codec + ", maxBlockSize=" + maxBlockSize + ", useHsync=" + useHsync + 
"]";
+      + codec + ", maxBlockSize=" + maxBlockSize + ", fsBlockSize=" + 
fsBlockSize + ", useHsync="
+      + useHsync + "]";
   }
 
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
index 963e22323f..1969fcc721 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
@@ -17,6 +17,12 @@
  */
 package org.apache.phoenix.replication.log;
 
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -24,6 +30,10 @@ import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellBuilderFactory;
 import org.apache.hadoop.hbase.CellBuilderType;
@@ -123,6 +133,30 @@ public interface LogFileTestUtil {
     }
   }
 
+  /**
+   * Concrete typed subclass of {@link FSDataOutputStreamBuilder} so Mockito 
can stub
+   * {@code build()} without bridge-method ambiguity from the self-bounded 
generics. Mockito mocks
+   * this class via subclass-bytecode generation; the (unused) protected 
constructor is never
+   * invoked.
+   */
+  abstract class TestBuilder extends 
FSDataOutputStreamBuilder<FSDataOutputStream, TestBuilder> {
+    protected TestBuilder(FileSystem fs, Path p) {
+      super(fs, p);
+    }
+  }
+
+  /**
+   * Stubs the {@code 
fs.createFile(...).overwrite(...).blockSize(...).build()} chain on the given
+   * mock {@link FileSystem} to return the supplied {@link FSDataOutputStream}.
+   */
+  static void stubCreateFile(FileSystem mockFs, FSDataOutputStream out) throws 
IOException {
+    TestBuilder builder = mock(TestBuilder.class);
+    when(mockFs.createFile(nullable(Path.class))).thenReturn(builder);
+    when(builder.overwrite(anyBoolean())).thenReturn(builder);
+    when(builder.blockSize(anyLong())).thenReturn(builder);
+    when(builder.build()).thenReturn(out);
+  }
+
   static void assertMutationEquals(String message, Mutation m1, Mutation m2) {
     try {
       if (!m1.toJSON().equals(m2.toJSON())) {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
index cd08ea5334..8ecae075e9 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.replication.log;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -60,7 +58,7 @@ public class LogFileWriterSyncTest {
     // -hsync-> FSDataOutputStream -hsync-> internalOutput
     internalOutput = spy(new LogFileTestUtil.SyncableByteArrayOutputStream());
     mockFs = mock(FileSystem.class);
-    when(mockFs.create(any(), anyBoolean())).thenReturn(
+    LogFileTestUtil.stubCreateFile(mockFs,
       new FSDataOutputStream((OutputStream) internalOutput, new 
FileSystem.Statistics("hdfs"), 0));
     when(internalOutput.getPos()).thenReturn(100L);
 
@@ -201,7 +199,7 @@ public class LogFileWriterSyncTest {
     Configuration hflushConf = HBaseConfiguration.create();
     SyncableDataOutput hflushOutput = spy(new 
LogFileTestUtil.SyncableByteArrayOutputStream());
     FileSystem hflushMockFs = mock(FileSystem.class);
-    when(hflushMockFs.create(any(), anyBoolean())).thenReturn(
+    LogFileTestUtil.stubCreateFile(hflushMockFs,
       new FSDataOutputStream((OutputStream) hflushOutput, new 
FileSystem.Statistics("hdfs"), 0));
     when(hflushOutput.getPos()).thenReturn(100L);
 

Reply via email to