This is an automated email from the ASF dual-hosted git repository.
edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new ac80267fcb Add utility to create an empty wal file. (#4116)
ac80267fcb is described below
commit ac80267fcb797dd57c5fead1cdd4e96d8a480a00
Author: EdColeman <[email protected]>
AuthorDate: Tue Apr 9 11:24:18 2024 -0400
Add utility to create an empty wal file. (#4116)
Updated the CreateEmpty utility with an option to create empty wal file(s).
---
.../accumulo/core/file/rfile/CreateEmpty.java | 113 ---------
.../apache/accumulo/tserver/util/CreateEmpty.java | 198 ++++++++++++++++
.../accumulo/tserver/util/CreateEmptyTest.java | 263 +++++++++++++++++++++
.../test/functional/RecoveryWithEmptyRFileIT.java | 3 +-
.../apache/accumulo/test/start/KeywordStartIT.java | 2 +-
5 files changed, 464 insertions(+), 115 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
deleted file mode 100644
index 5a8d4dc104..0000000000
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.file.rfile;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.accumulo.core.cli.Help;
-import org.apache.accumulo.core.conf.DefaultConfiguration;
-import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.file.rfile.bcfile.Compression;
-import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
-import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
-import org.apache.accumulo.core.spi.file.rfile.compression.NoCompression;
-import org.apache.accumulo.start.spi.KeywordExecutable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.beust.jcommander.IParameterValidator;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import com.google.auto.service.AutoService;
-
-/**
- * Create an empty RFile for use in recovering from data loss where Accumulo
still refers internally
- * to a path.
- */
-@AutoService(KeywordExecutable.class)
-public class CreateEmpty implements KeywordExecutable {
- private static final Logger log = LoggerFactory.getLogger(CreateEmpty.class);
-
- public static class NamedLikeRFile implements IParameterValidator {
- @Override
- public void validate(String name, String value) throws ParameterException {
- if (!value.endsWith(".rf")) {
- throw new ParameterException("File must end with .rf and '" + value +
"' does not.");
- }
- }
- }
-
- public static class IsSupportedCompressionAlgorithm implements
IParameterValidator {
- @Override
- public void validate(String name, String value) throws ParameterException {
- List<String> algorithms = Compression.getSupportedAlgorithms();
- if (!algorithms.contains(value)) {
- throw new ParameterException("Compression codec must be one of " +
algorithms);
- }
- }
- }
-
- static class Opts extends Help {
- @Parameter(names = {"-c", "--codec"}, description = "the compression codec
to use.",
- validateWith = IsSupportedCompressionAlgorithm.class)
- String codec = new NoCompression().getName();
- @Parameter(
- description = " <path> { <path> ... } Each path given is a URL."
- + " Relative paths are resolved according to the default
filesystem defined in"
- + " your Hadoop configuration, which is usually an HDFS instance.",
- required = true, validateWith = NamedLikeRFile.class)
- List<String> files = new ArrayList<>();
- }
-
- public static void main(String[] args) throws Exception {
- new CreateEmpty().execute(args);
- }
-
- @Override
- public String keyword() {
- return "create-empty";
- }
-
- @Override
- public String description() {
- return "Creates an empty rfile";
- }
-
- @Override
- public void execute(String[] args) throws Exception {
- Configuration conf = new Configuration();
-
- Opts opts = new Opts();
- opts.parseArgs("accumulo create-empty", args);
-
- for (String arg : opts.files) {
- UnreferencedTabletFile file = UnreferencedTabletFile.of(conf, new
Path(arg));
- log.info("Writing to file '{}'", file);
- FileSKVWriter writer = new RFileOperations().newWriterBuilder()
- .forFile(file, file.getPath().getFileSystem(conf), conf,
NoCryptoServiceFactory.NONE)
-
.withTableConfiguration(DefaultConfiguration.getInstance()).withCompression(opts.codec)
- .build();
- writer.close();
- }
- }
-
-}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/util/CreateEmpty.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/util/CreateEmpty.java
new file mode 100644
index 0000000000..2f15d2a382
--- /dev/null
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/util/CreateEmpty.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.tserver.log.DfsLogger.LOG_FILE_HEADER_V4;
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
+import org.apache.accumulo.core.crypto.CryptoUtils;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.spi.file.rfile.compression.NoCompression;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.IParameterValidator;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.auto.service.AutoService;
+
+/**
+ * Create an empty RFile for use in recovering from data loss where Accumulo
still refers internally
+ * to a path.
+ */
+@AutoService(KeywordExecutable.class)
+public class CreateEmpty implements KeywordExecutable {
+ private static final Logger LOG = LoggerFactory.getLogger(CreateEmpty.class);
+ public static final String RF_EXTENSION = ".rf";
+ public static final String WAL_EXTENSION = ".wal";
+
+ public static class MatchesValidFileExtension implements IParameterValidator
{
+ @Override
+ public void validate(String name, String value) throws ParameterException {
+ if (value.endsWith(RF_EXTENSION) || value.endsWith(WAL_EXTENSION)) {
+ return;
+ }
+ throw new ParameterException("File must end with either " + RF_EXTENSION
+ " or "
+ + WAL_EXTENSION + " and '" + value + "' does not.");
+ }
+ }
+
+ public static class IsSupportedCompressionAlgorithm implements
IParameterValidator {
+ @Override
+ public void validate(String name, String value) throws ParameterException {
+ List<String> algorithms = Compression.getSupportedAlgorithms();
+ if (!algorithms.contains(value)) {
+ throw new ParameterException("Compression codec must be one of " +
algorithms);
+ }
+ }
+ }
+
+ static class Opts extends ConfigOpts {
+ @Parameter(names = {"-c", "--codec"}, description = "the compression codec
to use.",
+ validateWith = IsSupportedCompressionAlgorithm.class)
+ String codec = new NoCompression().getName();
+ @Parameter(
+ description = " <path> { <path> ... } Each path given is a URL."
+ + " Relative paths are resolved according to the default
filesystem defined in"
+ + " your Hadoop configuration, which is usually an HDFS instance.",
+ required = true, validateWith = MatchesValidFileExtension.class)
+ List<String> files = new ArrayList<>();
+
+ public enum OutFileType {
+ RF, WAL
+ }
+
+ // rfile as default keeps previous behaviour
+ @Parameter(names = "--type")
+ public OutFileType fileType = OutFileType.RF;
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ new CreateEmpty().execute(args);
+ }
+
+ @Override
+ public String keyword() {
+ return "create-empty";
+ }
+
+ @Override
+ public String description() {
+ return "Creates empty RFiles (RF) or empty write-ahead log (WAL) files for
emergency recovery";
+ }
+
+ @Override
+ public void execute(String[] args) throws Exception {
+
+ Opts opts = new Opts();
+ opts.parseArgs("accumulo create-empty", args);
+
+ var siteConfig = opts.getSiteConfiguration();
+ try (ServerContext context = new ServerContext(siteConfig)) {
+ switch (opts.fileType) {
+ case RF:
+ createEmptyRFile(opts, context);
+ break;
+ case WAL:
+ createEmptyWal(opts, context);
+ break;
+ default:
+ throw new ParameterException("file type must be RF or WAL, received:
" + opts.fileType);
+ }
+ }
+ }
+
+ void createEmptyRFile(final Opts opts, final ServerContext context) throws
IOException {
+ var vm = context.getVolumeManager();
+
+ CryptoEnvironment env = new
CryptoEnvironmentImpl(CryptoEnvironment.Scope.TABLE);
+ CryptoService cryptoService = context.getCryptoFactory().getService(env,
+ context.getConfiguration().getAllCryptoProperties());
+
+ for (String filename : opts.files) {
+ Path path = new Path(filename);
+ checkFileExists(path, vm);
+ UnreferencedTabletFile tabletFile =
+ UnreferencedTabletFile.of(vm.getFileSystemByPath(path), path);
+ LOG.info("Writing to file '{}'", tabletFile);
+ FileSKVWriter writer = new RFileOperations().newWriterBuilder()
+ .forFile(tabletFile, vm.getFileSystemByPath(path),
context.getHadoopConf(), cryptoService)
+
.withTableConfiguration(DefaultConfiguration.getInstance()).withCompression(opts.codec)
+ .build();
+ writer.close();
+ }
+ }
+
+ void createEmptyWal(Opts opts, ServerContext context) throws IOException {
+ final LogFileValue EMPTY = new LogFileValue();
+
+ var vm = context.getVolumeManager();
+
+ for (String filename : opts.files) {
+ Path path = new Path(filename);
+ checkFileExists(path, vm);
+ try (var out = new DataOutputStream(vm.create(path))) {
+ LOG.info("Output file: {}", path);
+
+ out.write(LOG_FILE_HEADER_V4.getBytes(UTF_8));
+
+ CryptoEnvironment env = new
CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL);
+ CryptoService cryptoService =
context.getCryptoFactory().getService(env,
+ context.getConfiguration().getAllCryptoProperties());
+
+ byte[] cryptoParams =
cryptoService.getFileEncrypter(env).getDecryptionParameters();
+ CryptoUtils.writeParams(cryptoParams, out);
+
+ LogFileKey key = new LogFileKey();
+ key.event = OPEN;
+ key.tserverSession = "";
+
+ key.write(out);
+ EMPTY.write(out);
+ }
+ }
+ }
+
+ private void checkFileExists(final Path path, final VolumeManager vm) throws
IOException {
+ if (vm.exists(path)) {
+ throw new IllegalArgumentException(path + " exists");
+ }
+ }
+}
diff --git
a/server/tserver/src/test/java/org/apache/accumulo/tserver/util/CreateEmptyTest.java
b/server/tserver/src/test/java/org/apache/accumulo/tserver/util/CreateEmptyTest.java
new file mode 100644
index 0000000000..ad9f73f948
--- /dev/null
+++
b/server/tserver/src/test/java/org/apache/accumulo/tserver/util/CreateEmptyTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.tserver.log.DfsLogger.LOG_FILE_HEADER_V4;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
+import org.apache.accumulo.core.crypto.CryptoUtils;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class CreateEmptyTest {
+ @TempDir
+ private static File tempDir;
+
+ private ServerContext context;
+
+ @BeforeEach
+ public void init() throws IOException {
+ ConfigurationCopy config = new
ConfigurationCopy(DefaultConfiguration.getInstance());
+ config.set(Property.INSTANCE_VOLUMES.getKey(), "file:///");
+
+ context = mock(ServerContext.class);
+ expect(context.getCryptoFactory()).andReturn(new
GenericCryptoServiceFactory()).anyTimes();
+ expect(context.getConfiguration()).andReturn(config).anyTimes();
+ expect(context.getHadoopConf()).andReturn(new Configuration()).anyTimes();
+ VolumeManager volumeManager = VolumeManagerImpl.get(config, new
Configuration());
+ expect(context.getVolumeManager()).andReturn(volumeManager).anyTimes();
+ replay(context);
+ }
+
+ @AfterEach
+ public void verifyMock() {
+ verify(context);
+ }
+
+ @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path
provided by test")
+ @Test
+ public void exceptionOnFileExistsTest() throws Exception {
+ CreateEmpty createEmpty = new CreateEmpty();
+
+ String wal1 = genFilename(tempDir.getAbsolutePath() + "/empty", ".wal");
+ String rf1 = genFilename(tempDir.getAbsolutePath() + "/empty", ".rf");
+
+ // create the file so it exists
+ File f = new File(wal1);
+ assertTrue(f.createNewFile());
+
+ String[] walArgs = {"--type", "WAL", wal1};
+ CreateEmpty.Opts walOpts = new CreateEmpty.Opts();
+ walOpts.parseArgs("accumulo create-empty", walArgs);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> createEmpty.createEmptyWal(walOpts, context));
+
+ // create the file so it exists
+ File f2 = new File(rf1);
+ assertTrue(f2.createNewFile());
+
+ String[] rfArgs = {"--type", "RF", rf1};
+ CreateEmpty.Opts rfOpts = new CreateEmpty.Opts();
+ rfOpts.parseArgs("accumulo create-empty", rfArgs);
+ assertThrows(IllegalArgumentException.class,
+ () -> createEmpty.createEmptyRFile(walOpts, context));
+ }
+
+ @Test
+ public void createRfileTest() throws Exception {
+ CreateEmpty createEmpty = new CreateEmpty();
+
+ String file1 = genFilename(tempDir.getAbsolutePath() + "/empty", ".rf");
+ String file2 = genFilename(tempDir.getAbsolutePath() + "/empty", ".rf");
+
+ String[] args = {"--type", "RF", file1, file2};
+ CreateEmpty.Opts opts = new CreateEmpty.Opts();
+ opts.parseArgs("accumulo create-empty", args);
+
+ createEmpty.createEmptyRFile(opts, context);
+ VolumeManager vm = context.getVolumeManager();
+ assertTrue(vm.exists(new Path(file1)));
+ try (var scanner = RFile.newScanner().from(file1).build()) {
+ assertEquals(0, scanner.stream().count());
+ }
+
+ assertTrue(vm.exists(new Path(file2)));
+ try (var scanner = RFile.newScanner().from(file2).build()) {
+ assertEquals(0, scanner.stream().count());
+ }
+
+ }
+
+ /**
+ * Validate that the default type is RF (RecoveryWithEmptyRFileIT also needs
this(
+ */
+ @Test
+ public void createRfileDefaultTest() throws Exception {
+ CreateEmpty createEmpty = new CreateEmpty();
+
+ String file1 = genFilename(tempDir.getAbsolutePath() + "/empty", ".rf");
+
+ String[] args = {file1};
+ CreateEmpty.Opts opts = new CreateEmpty.Opts();
+ opts.parseArgs("accumulo create-empty", args);
+
+ createEmpty.createEmptyRFile(opts, context);
+ VolumeManager vm = context.getVolumeManager();
+ assertTrue(vm.exists(new Path(file1)));
+ try (var scanner = RFile.newScanner().from(file1).build()) {
+ assertEquals(0, scanner.stream().count());
+ }
+ }
+
+ @Test
+ public void createWalTest() throws Exception {
+ CreateEmpty createEmpty = new CreateEmpty();
+
+ String file1 = genFilename(tempDir.getAbsolutePath() + "/empty", ".wal");
+ String file2 = genFilename(tempDir.getAbsolutePath() + "/empty", ".wal");
+
+ String[] args = {"--type", "WAL", file1, file2};
+ CreateEmpty.Opts opts = new CreateEmpty.Opts();
+ opts.parseArgs("accumulo create-empty", args);
+
+ createEmpty.createEmptyWal(opts, context);
+
+ checkWalContext(file1);
+ readLogFile(file1);
+
+ checkWalContext(file2);
+ }
+
+ /**
+ * Reads the log file and looks for specific information (crypto id, event
== OPEN)
+ */
+ private void checkWalContext(final String expected) throws IOException {
+ Path path = new Path(expected);
+ VolumeManager vm = context.getVolumeManager();
+ assertTrue(vm.exists(path));
+
+ vm.open(path);
+ try (InputStream inputStream = vm.open(path).getWrappedStream();
+ DataInputStream dis = new DataInputStream(inputStream)) {
+ byte[] headerBuf = new byte[1024];
+ int len = dis.read(headerBuf, 0, LOG_FILE_HEADER_V4.length());
+ assertEquals(LOG_FILE_HEADER_V4.length(), len);
+ assertEquals(LOG_FILE_HEADER_V4,
+ new String(headerBuf, 0, LOG_FILE_HEADER_V4.length(), UTF_8));
+
+ CryptoEnvironment env = new
CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL);
+ CryptoService cryptoService = context.getCryptoFactory().getService(env,
+ context.getConfiguration().getAllCryptoProperties());
+
+ byte[] decryptionParams =
cryptoService.getFileEncrypter(env).getDecryptionParameters();
+
+ var cryptParams = CryptoUtils.readParams(dis);
+ assertArrayEquals(decryptionParams, cryptParams);
+
+ LogFileKey key = new LogFileKey();
+ key.readFields(dis);
+
+ assertEquals(key.event, LogEvents.OPEN);
+ assertEquals("", key.tserverSession);
+ assertNull(key.filename);
+ }
+ }
+
+ /**
+ * Scan through log file and check that there is one event.
+ */
+ private void readLogFile(final String filename) throws Exception {
+ Path path = new Path(filename);
+ LogFileKey key = new LogFileKey();
+ LogFileValue value = new LogFileValue();
+
+ FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
+
+ CryptoEnvironment env = new
CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL);
+ CryptoService cryptoService = context.getCryptoFactory().getService(env,
+ context.getConfiguration().getAllCryptoProperties());
+
+ int eventCount = 0;
+ try (final FSDataInputStream fsinput = fs.open(path);
+ DataInputStream input = DfsLogger.getDecryptingStream(fsinput,
cryptoService)) {
+ while (true) {
+ try {
+ key.readFields(input);
+ value.readFields(input);
+ } catch (EOFException ex) {
+ break;
+ }
+ eventCount++;
+ }
+ } catch (DfsLogger.LogHeaderIncompleteException e) {
+ fail("Could not read header for {}" + path);
+ } finally {
+ // empty wal has 1 event (OPEN)
+ assertEquals(1, eventCount);
+ }
+ }
+
+ // tempDir is per test suite - generate a one-up count file for each call.
+ private static final AtomicInteger fileCount = new AtomicInteger(0);
+
+ private String genFilename(final String prefix, final String extension) {
+ return prefix + fileCount.incrementAndGet() + extension;
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
index 5524ae188f..c65cd0af5f 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
@@ -31,12 +31,12 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.rfile.CreateEmpty;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.tserver.util.CreateEmpty;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -95,6 +95,7 @@ public class RecoveryWithEmptyRFileIT extends
ConfigurableMacBase {
Path rfile =
StoredTabletFile.of(entry.getKey().getColumnQualifier()).getPath();
log.debug("Removing rfile '{}'", rfile);
cluster.getFileSystem().delete(rfile, false);
+ // following depends on create-empty defaults to rfile.
Process processInfo = cluster.exec(CreateEmpty.class,
rfile.toString()).getProcess();
assertEquals(0, processInfo.waitFor());
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
b/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
index bab15197eb..07d45a0238 100644
--- a/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
@@ -39,7 +39,6 @@ import java.util.TreeMap;
import org.apache.accumulo.compactor.CompactorExecutable;
import org.apache.accumulo.coordinator.CoordinatorExecutable;
-import org.apache.accumulo.core.file.rfile.CreateEmpty;
import org.apache.accumulo.core.file.rfile.GenerateSplits;
import org.apache.accumulo.core.file.rfile.PrintInfo;
import org.apache.accumulo.core.file.rfile.SplitLarge;
@@ -72,6 +71,7 @@ import org.apache.accumulo.tserver.ScanServerExecutable;
import org.apache.accumulo.tserver.TServerExecutable;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.logger.LogReader;
+import org.apache.accumulo.tserver.util.CreateEmpty;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;