This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 5779334797 PHOENIX-7865 Set explicit HDFS block size on replication
log files (#2488)
5779334797 is described below
commit 57793347975266f33bc354e1ff1fa3847d9ba37a
Author: tkhurana <[email protected]>
AuthorDate: Fri May 22 10:44:09 2026 -0700
PHOENIX-7865 Set explicit HDFS block size on replication log files (#2488)
---
.../apache/phoenix/replication/ReplicationLog.java | 13 ++++-----
.../phoenix/replication/ReplicationLogGroup.java | 3 ++
.../phoenix/replication/log/LogFileWriter.java | 3 +-
.../replication/log/LogFileWriterContext.java | 13 ++++++++-
.../phoenix/replication/log/LogFileTestUtil.java | 34 ++++++++++++++++++++++
.../replication/log/LogFileWriterSyncTest.java | 6 ++--
6 files changed, 58 insertions(+), 14 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index 44d4972e75..3949b3ef2b 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -68,6 +68,7 @@ public class ReplicationLog {
protected final ReplicationLogGroup logGroup;
protected final long rotationTimeMs;
protected final long rotationSizeBytes;
+ protected final long fsBlockSize;
protected final int maxRotationRetries;
protected final Compression.Algorithm compression;
protected final int maxAttempts;
@@ -109,13 +110,9 @@ public class ReplicationLog {
double rotationSizePercent =
conf.getDouble(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
- long blockSize = shardManager.getFileSystem().getDefaultBlockSize();
- if (configuredRotationSize > blockSize) {
- LOG.warn("Configured rotation size {} exceeds HDFS block size {};
clamping to block size",
- configuredRotationSize, blockSize);
- }
- long effectiveRotationSize = Math.min(configuredRotationSize, blockSize);
- this.rotationSizeBytes = (long) (effectiveRotationSize *
rotationSizePercent);
+ this.rotationSizeBytes = (long) (configuredRotationSize *
rotationSizePercent);
+ this.fsBlockSize =
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_FS_BLOCK_SIZE_BYTES_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_LOG_FS_BLOCK_SIZE_BYTES);
this.maxRotationRetries =
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_ROTATION_RETRIES_KEY,
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES);
String compressionName =
conf.get(ReplicationLogGroup.REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
@@ -150,7 +147,7 @@ public class ReplicationLog {
logGroup.getServerName().getServerName());
LogFileWriterContext writerContext = new
LogFileWriterContext(logGroup.getConfiguration())
.setFileSystem(replicationShardDirectoryManager.getFileSystem()).setFilePath(filePath)
- .setCompression(compression);
+ .setCompression(compression).setFsBlockSize(fsBlockSize);
LogFileWriter newWriter = new LogFileWriter();
newWriter.init(writerContext);
newWriter.setGeneration(writerGeneration.incrementAndGet());
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index e7e5a69a3b..c1e531461a 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -151,6 +151,9 @@ public class ReplicationLogGroup {
public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
"phoenix.replication.log.rotation.size.percentage";
public static final double DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE
= 0.95;
+ public static final String REPLICATION_LOG_FS_BLOCK_SIZE_BYTES_KEY =
+ "phoenix.replication.log.fs.block.size.bytes";
+ public static final long DEFAULT_REPLICATION_LOG_FS_BLOCK_SIZE_BYTES = 256 *
1024 * 1024L;
public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
"phoenix.replication.log.compression";
public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM =
"NONE";
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index 7f5ba64084..dc0cb811f1 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -68,7 +68,8 @@ public class LogFileWriter implements LogFile.Writer {
// TODO: Handle stream creation with proper permissions and overwrite
options based on
// config. For now we overwrite.
FileSystem fs = context.getFileSystem();
- FSDataOutputStream out = fs.create(context.getFilePath(), true);
+ FSDataOutputStream out =
fs.createFile(context.getFilePath()).overwrite(true)
+ .blockSize(context.getFsBlockSize()).build();
this.writer.init(context, new HDFSDataOutput(out, context.getUseHsync()));
LOG.debug("Initialized LogFileWriter for path {}", context.getFilePath());
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriterContext.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriterContext.java
index 396004c2f1..fdbe916c29 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriterContext.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriterContext.java
@@ -50,6 +50,7 @@ public class LogFileWriterContext {
private Compression.Algorithm compression;
private LogFileCodec codec;
private long maxBlockSize;
+ private long fsBlockSize;
private boolean useHsync;
public LogFileWriterContext(Configuration conf) {
@@ -122,6 +123,15 @@ public class LogFileWriterContext {
return this;
}
+ public long getFsBlockSize() {
+ return fsBlockSize;
+ }
+
+ public LogFileWriterContext setFsBlockSize(long fsBlockSize) {
+ this.fsBlockSize = fsBlockSize;
+ return this;
+ }
+
public boolean getUseHsync() {
return useHsync;
}
@@ -134,7 +144,8 @@ public class LogFileWriterContext {
@Override
public String toString() {
return "LogFileWriterContext [path=" + path + ", compression=" +
compression + ", codec="
- + codec + ", maxBlockSize=" + maxBlockSize + ", useHsync=" + useHsync +
"]";
+ + codec + ", maxBlockSize=" + maxBlockSize + ", fsBlockSize=" +
fsBlockSize + ", useHsync="
+ + useHsync + "]";
}
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
index 963e22323f..1969fcc721 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
@@ -17,6 +17,12 @@
*/
package org.apache.phoenix.replication.log;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -24,6 +30,10 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
@@ -123,6 +133,30 @@ public interface LogFileTestUtil {
}
}
+ /**
+ * Concrete typed subclass of {@link FSDataOutputStreamBuilder} so Mockito
can stub
+ * {@code build()} without bridge-method ambiguity from the self-bounded
generics. Mockito mocks
+ * this class via subclass-bytecode generation; the (unused) protected
constructor is never
+ * invoked.
+ */
+ abstract class TestBuilder extends
FSDataOutputStreamBuilder<FSDataOutputStream, TestBuilder> {
+ protected TestBuilder(FileSystem fs, Path p) {
+ super(fs, p);
+ }
+ }
+
+ /**
+ * Stubs the {@code
fs.createFile(...).overwrite(...).blockSize(...).build()} chain on the given
+ * mock {@link FileSystem} to return the supplied {@link FSDataOutputStream}.
+ */
+ static void stubCreateFile(FileSystem mockFs, FSDataOutputStream out) throws
IOException {
+ TestBuilder builder = mock(TestBuilder.class);
+ when(mockFs.createFile(nullable(Path.class))).thenReturn(builder);
+ when(builder.overwrite(anyBoolean())).thenReturn(builder);
+ when(builder.blockSize(anyLong())).thenReturn(builder);
+ when(builder.build()).thenReturn(out);
+ }
+
static void assertMutationEquals(String message, Mutation m1, Mutation m2) {
try {
if (!m1.toJSON().equals(m2.toJSON())) {
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
index cd08ea5334..8ecae075e9 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
@@ -17,8 +17,6 @@
*/
package org.apache.phoenix.replication.log;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@@ -60,7 +58,7 @@ public class LogFileWriterSyncTest {
// -hsync-> FSDataOutputStream -hsync-> internalOutput
internalOutput = spy(new LogFileTestUtil.SyncableByteArrayOutputStream());
mockFs = mock(FileSystem.class);
- when(mockFs.create(any(), anyBoolean())).thenReturn(
+ LogFileTestUtil.stubCreateFile(mockFs,
new FSDataOutputStream((OutputStream) internalOutput, new
FileSystem.Statistics("hdfs"), 0));
when(internalOutput.getPos()).thenReturn(100L);
@@ -201,7 +199,7 @@ public class LogFileWriterSyncTest {
Configuration hflushConf = HBaseConfiguration.create();
SyncableDataOutput hflushOutput = spy(new
LogFileTestUtil.SyncableByteArrayOutputStream());
FileSystem hflushMockFs = mock(FileSystem.class);
- when(hflushMockFs.create(any(), anyBoolean())).thenReturn(
+ LogFileTestUtil.stubCreateFile(hflushMockFs,
new FSDataOutputStream((OutputStream) hflushOutput, new
FileSystem.Statistics("hdfs"), 0));
when(hflushOutput.getPos()).thenReturn(100L);