This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new cdcd3e7b42 PHOENIX-7815 Make WAL sync durability configurable (#2434)
cdcd3e7b42 is described below
commit cdcd3e7b42507900da458333ce520077b66c095c
Author: tkhurana <[email protected]>
AuthorDate: Fri May 1 11:17:14 2026 -0700
PHOENIX-7815 Make WAL sync durability configurable (#2434)
---
.../phoenix/replication/log/AsyncFSDataOutput.java | 10 ++++++--
.../phoenix/replication/log/HDFSDataOutput.java | 10 ++++++--
.../phoenix/replication/log/LogFileWriter.java | 2 +-
.../replication/log/LogFileWriterContext.java | 14 ++++++++++-
.../phoenix/replication/log/LogFileFormatTest.java | 2 +-
.../replication/log/LogFileWriterSyncTest.java | 29 ++++++++++++++++++++++
6 files changed, 60 insertions(+), 7 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/AsyncFSDataOutput.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/AsyncFSDataOutput.java
index 11ce86adc4..18776e2c6a 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/AsyncFSDataOutput.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/AsyncFSDataOutput.java
@@ -31,9 +31,11 @@ import org.apache.hadoop.hbase.util.Bytes;
public class AsyncFSDataOutput implements SyncableDataOutput {
private final AsyncFSOutput delegate;
+ private final boolean useHsync;
- public AsyncFSDataOutput(AsyncFSOutput delegate) {
+ public AsyncFSDataOutput(AsyncFSOutput delegate, boolean useHsync) {
this.delegate = delegate;
+ this.useHsync = useHsync;
}
@Override
@@ -158,7 +160,11 @@ public class AsyncFSDataOutput implements
SyncableDataOutput {
@Override
public void sync() throws IOException {
- hsync();
+ if (useHsync) {
+ hsync();
+ } else {
+ hflush();
+ }
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/HDFSDataOutput.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/HDFSDataOutput.java
index 9777b76eca..4f37575684 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/HDFSDataOutput.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/HDFSDataOutput.java
@@ -28,9 +28,11 @@ import org.apache.hadoop.fs.FSDataOutputStream;
public class HDFSDataOutput implements SyncableDataOutput {
private final FSDataOutputStream delegate;
+ private final boolean useHsync;
- public HDFSDataOutput(FSDataOutputStream delegate) {
+ public HDFSDataOutput(FSDataOutputStream delegate, boolean useHsync) {
this.delegate = delegate;
+ this.useHsync = useHsync;
}
@Override
@@ -125,7 +127,11 @@ public class HDFSDataOutput implements SyncableDataOutput {
@Override
public void sync() throws IOException {
- delegate.hsync();
+ if (useHsync) {
+ delegate.hsync();
+ } else {
+ delegate.hflush();
+ }
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index 923fc0f7bf..e1c3aadf49 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -68,7 +68,7 @@ public class LogFileWriter implements LogFile.Writer {
// config. For now we overwrite.
FileSystem fs = context.getFileSystem();
FSDataOutputStream out = fs.create(context.getFilePath(), true);
- this.writer.init(context, new HDFSDataOutput(out));
+ this.writer.init(context, new HDFSDataOutput(out, context.getUseHsync()));
LOG.debug("Initialized LogFileWriter for path {}", context.getFilePath());
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriterContext.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriterContext.java
index c8d909f993..396004c2f1 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriterContext.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriterContext.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +50,7 @@ public class LogFileWriterContext {
private Compression.Algorithm compression;
private LogFileCodec codec;
private long maxBlockSize;
+ private boolean useHsync;
public LogFileWriterContext(Configuration conf) {
this.conf = conf;
@@ -65,6 +67,7 @@ public class LogFileWriterContext {
this.maxBlockSize, DEFAULT_LOGFILE_BLOCK_SIZE);
this.maxBlockSize = DEFAULT_LOGFILE_BLOCK_SIZE;
}
+ this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY,
HRegion.DEFAULT_WAL_HSYNC);
// Note: When we have multiple codec types, instantiate the appropriate
type based on
// configuration;
this.codec = new LogFileCodec();
@@ -119,10 +122,19 @@ public class LogFileWriterContext {
return this;
}
+ public boolean getUseHsync() {
+ return useHsync;
+ }
+
+ public LogFileWriterContext setUseHsync(boolean useHsync) {
+ this.useHsync = useHsync;
+ return this;
+ }
+
@Override
public String toString() {
return "LogFileWriterContext [path=" + path + ", compression=" +
compression + ", codec="
- + codec + ", maxBlockSize=" + maxBlockSize + "]";
+ + codec + ", maxBlockSize=" + maxBlockSize + ", useHsync=" + useHsync +
"]";
}
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
index 0d473cfd65..0044483008 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
@@ -665,7 +665,7 @@ public class LogFileFormatTest {
public void hsync() throws IOException {
writerDos.flush();
}
- });
+ }, false);
writer.init(writerContext, output);
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
index 85d68b32c8..5f07d3b167 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -50,6 +51,7 @@ public class LogFileWriterSyncTest {
@Before
public void setUp() throws IOException {
conf = HBaseConfiguration.create();
+ conf.setBoolean(HRegion.WAL_HSYNC_CONF_KEY, true);
// This is structured so we verify that FSDataOutputStream correctly calls
hsync() on its
// understream, which in this case will be our
SyncableByteArrayOutputStream.
@@ -189,4 +191,31 @@ public class LogFileWriterSyncTest {
verify(internalOutput, times(1)).hsync();
}
+ @Test
+ public void testSyncWithHflush() throws IOException {
+ // Create a separate writer with default config (hbase.wal.hsync=false)
+ Configuration hflushConf = HBaseConfiguration.create();
+ SyncableDataOutput hflushOutput = spy(new
LogFileTestUtil.SyncableByteArrayOutputStream());
+ FileSystem hflushMockFs = mock(FileSystem.class);
+ when(hflushMockFs.create(any(), anyBoolean())).thenReturn(
+ new FSDataOutputStream((OutputStream) hflushOutput, new
FileSystem.Statistics("hdfs"), 0));
+ when(hflushOutput.getPos()).thenReturn(100L);
+
+ LogFileWriterContext hflushContext =
+ new LogFileWriterContext(hflushConf).setFileSystem(hflushMockFs);
+ LogFileWriter hflushWriter = new LogFileWriter();
+ hflushWriter.init(hflushContext);
+
+ try {
+ Mutation m1 = LogFileTestUtil.newPut("row1", 1L, 1);
+ hflushWriter.append("TBL", 1L, m1);
+ hflushWriter.sync();
+
+ verify(hflushOutput, times(1)).hflush();
+ verify(hflushOutput, never()).hsync();
+ } finally {
+ hflushWriter.close();
+ }
+ }
+
}