[ 
https://issues.apache.org/jira/browse/NIFI-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938611#comment-17938611
 ] 

Ruben Van Wanzeele commented on NIFI-14392:
-------------------------------------------

Attached you can find an attempt to resolve this issue by temporarily putting 
the MinimalLockingWriteAheadLog back to foresee a migration.
{code:java}
Subject: [PATCH] An attempt to provide a migration solution for the state 
provider.
---
Index: 
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
    (revision 671f07e444fd98d8f3ea88fc87a90c5c82f438db)
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
    (date 1742995213665)
@@ -17,23 +17,6 @@
 
 package org.apache.nifi.controller.state.providers.local;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
@@ -46,10 +29,13 @@
 import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wali.SerDe;
-import org.wali.SerDeFactory;
-import org.wali.UpdateType;
-import org.wali.WriteAheadRepository;
+import org.wali.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Provides state management for local (standalone) state, backed by a 
write-ahead log
@@ -112,7 +98,6 @@
         long checkpointIntervalMillis = 
context.getProperty(CHECKPOINT_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
         alwaysSync = context.getProperty(ALWAYS_SYNC).asBoolean();
 
-
         final File basePath = new File(context.getProperty(PATH).getValue());
 
         if (!basePath.exists() && !basePath.mkdirs()) {
@@ -134,6 +119,8 @@
         versionGenerator = new AtomicLong(-1L);
         writeAheadLog = new SequentialAccessWriteAheadLog<>(basePath, new 
SerdeFactory(serde));
 
+        migrateMinimalLockingToSequentialAccessIfRequired(basePath);
+
         final Collection<StateMapUpdate> updates = 
writeAheadLog.recoverRecords();
         long maxRecordVersion = EMPTY_VERSION;
 
@@ -160,6 +147,26 @@
         executor.scheduleWithFixedDelay(new CheckpointTask(), 
checkpointIntervalMillis, checkpointIntervalMillis, TimeUnit.MILLISECONDS);
     }
 
+    @SuppressWarnings("deprecation")
+    private void migrateMinimalLockingToSequentialAccessIfRequired(File 
basePath) throws IOException {
+        // Temporary code to migrate the previous WAL to the new WAL
+        if (basePath.toPath().resolve("partition-1").toFile().exists()) {
+            logger.info("Detected MinimalLockingWriteAheadLog for the state 
info, migrating to SequentialAccessWriteAheadLog");
+            // Just setting the partition count to 16 (the default), as it 
will detect the existing partitions
+            // automatically and use them instead if required.
+            MinimalLockingWriteAheadLog<StateMapUpdate> 
minimalLockingWriteAheadLog =
+                    new MinimalLockingWriteAheadLog<>(basePath.toPath(), 16, 
serde, null);
+            Collection<StateMapUpdate> records = 
minimalLockingWriteAheadLog.recoverRecords();
+            writeAheadLog.update(records, true);
+            logger.info("Migrated state info to 
SequentialAccessWriteAheadLog");
+            for (final File child : 
Objects.requireNonNull(basePath.listFiles())) {
+                if (child.isDirectory() && 
child.getName().startsWith("partition-")) {
+                    logger.info("Removing old partition directory {}", child);
+                }
+            }
+        }
+    }
+
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
Index: 
nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
new file mode 100644
--- /dev/null    (date 1742993032396)
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
    (date 1742993032396)
@@ -0,0 +1,1183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.wali;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * This implementation provides as little Locking as possible in order to
+ * provide the highest throughput possible. However, this implementation is 
ONLY
+ * appropriate if it can be guaranteed that only a single thread will ever 
issue
+ * updates for a given Record at any one time.
+ * </p>
+ *
+ * @param <T> type of record this WAL is for
+ *
+ * @deprecated This implementation is now deprecated in favor of {@link 
SequentialAccessWriteAheadLog}.
+ *             This implementation, when given more than 1 partition, can have 
issues recovering after a sudden loss
+ *             of power or an operating system crash.
+ */
+@Deprecated
+public final class MinimalLockingWriteAheadLog<T> implements 
WriteAheadRepository<T> {
+
+    private final Path basePath;
+    private final Path partialPath;
+    private final Path snapshotPath;
+
+    private final SerDeFactory<T> serdeFactory;
+    private final SyncListener syncListener;
+    private final FileChannel lockChannel;
+    private final AtomicLong transactionIdGenerator = new AtomicLong(0L);
+
+    private final Partition<T>[] partitions;
+    private final AtomicLong partitionIndex = new AtomicLong(0L);
+    private final ConcurrentMap<Object, T> recordMap = new 
ConcurrentHashMap<>();
+    private final Map<Object, T> unmodifiableRecordMap = 
Collections.unmodifiableMap(recordMap);
+    private final Set<String> externalLocations = new CopyOnWriteArraySet<>();
+
+    private final Set<String> recoveredExternalLocations = new 
CopyOnWriteArraySet<>();
+
+    private final AtomicInteger numberBlackListedPartitions = new 
AtomicInteger(0);
+
+    private static final Logger logger = 
LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class);
+
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock(); // required to update a 
partition
+    private final Lock writeLock = rwLock.writeLock(); // required for 
checkpoint
+
+    private volatile boolean updated = false;
+    private volatile boolean recovered = false;
+
+    public MinimalLockingWriteAheadLog(final Path path, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws 
IOException {
+        this(new TreeSet<>(Collections.singleton(path)), partitionCount, new 
SingletonSerDeFactory<>(serde), syncListener);
+    }
+
+    public MinimalLockingWriteAheadLog(final Path path, final int 
partitionCount, final SerDeFactory<T> serdeFactory, final SyncListener 
syncListener) throws IOException {
+        this(new TreeSet<>(Collections.singleton(path)), partitionCount, 
serdeFactory, syncListener);
+    }
+
+    public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws 
IOException {
+        this(paths, partitionCount, new SingletonSerDeFactory<>(serde), 
syncListener);
+    }
+
+    /**
+     *
+     * @param paths a sorted set of Paths to use for the partitions/journals 
and
+     * the snapshot. The snapshot will always be written to the first path
+     * specified.
+     * @param partitionCount the number of partitions/journals to use. For best
+     * performance, this should be close to the number of threads that are
+     * expected to update the repository simultaneously
+     * @param serdeFactory the factory for the serializer/deserializer for 
records
+     * @param syncListener the listener
+     * @throws IOException if unable to initialize due to IO issue
+     */
+    @SuppressWarnings("unchecked")
+    public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int 
partitionCount, final SerDeFactory<T> serdeFactory, final SyncListener 
syncListener) throws IOException {
+        this.syncListener = syncListener;
+
+        requireNonNull(paths);
+        requireNonNull(serdeFactory);
+
+        if (paths.isEmpty()) {
+            throw new IllegalArgumentException("Paths must be non-empty");
+        }
+
+        int resolvedPartitionCount = partitionCount;
+        int existingPartitions = 0;
+        for (final Path path : paths) {
+            if (!Files.exists(path)) {
+                Files.createDirectories(path);
+            }
+
+            final File file = path.toFile();
+            if (!file.isDirectory()) {
+                throw new IOException("Path given [" + path + "] is not a 
directory");
+            }
+            if (!file.canWrite()) {
+                throw new IOException("Path given [" + path + "] is not 
writable");
+            }
+            if (!file.canRead()) {
+                throw new IOException("Path given [" + path + "] is not 
readable");
+            }
+            if (!file.canExecute()) {
+                throw new IOException("Path given [" + path + "] is not 
executable");
+            }
+
+            final File[] children = file.listFiles();
+            if (children != null) {
+                for (final File child : children) {
+                    if (child.isDirectory() && 
child.getName().startsWith("partition-")) {
+                        existingPartitions++;
+                    }
+                }
+
+                if (existingPartitions != 0 && existingPartitions != 
partitionCount) {
+                    logger.warn("Constructing MinimalLockingWriteAheadLog with 
partitionCount={}, but the repository currently has {} partitions; ignoring 
argument and proceeding with {} partitions",
+                            partitionCount, existingPartitions, 
existingPartitions);
+                    resolvedPartitionCount = existingPartitions;
+                }
+            }
+        }
+
+        this.basePath = paths.iterator().next();
+        this.partialPath = basePath.resolve("snapshot.partial");
+        this.snapshotPath = basePath.resolve("snapshot");
+        this.serdeFactory = serdeFactory;
+
+        final Path lockPath = basePath.resolve("wali.lock");
+        lockChannel = new FileOutputStream(lockPath.toFile()).getChannel();
+        lockChannel.lock();
+
+        partitions = new Partition[resolvedPartitionCount];
+
+        Iterator<Path> pathIterator = paths.iterator();
+        for (int i = 0; i < resolvedPartitionCount; i++) {
+            // If we're out of paths, create a new iterator to start over.
+            if (!pathIterator.hasNext()) {
+                pathIterator = paths.iterator();
+            }
+
+            final Path partitionBasePath = pathIterator.next();
+
+            partitions[i] = new 
Partition<>(partitionBasePath.resolve("partition-" + i), serdeFactory, i, 
getVersion());
+        }
+    }
+
+    @Override
+    public int update(final Collection<T> records, final boolean forceSync) 
throws IOException {
+        if (!recovered) {
+            throw new IllegalStateException("Cannot update repository until 
record recovery has been performed");
+        }
+
+        if (records.isEmpty()) {
+            return -1;
+        }
+
+        updated = true;
+        readLock.lock();
+        try {
+            while (true) {
+                final int numBlackListed = numberBlackListedPartitions.get();
+                if (numBlackListed >= partitions.length) {
+                    throw new IOException("All Partitions have been 
blacklisted due to "
+                            + "failures when attempting to update. If the 
Write-Ahead Log is able to perform a checkpoint, "
+                            + "this issue may resolve itself. Otherwise, 
manual intervention will be required.");
+                }
+
+                final long partitionIdx = partitionIndex.getAndIncrement();
+                final int resolvedIdx = (int) (partitionIdx % 
partitions.length);
+                final Partition<T> partition = partitions[resolvedIdx];
+                if (partition.tryClaim()) {
+                    try {
+                        final long transactionId = 
transactionIdGenerator.getAndIncrement();
+                        if (logger.isTraceEnabled()) {
+                            for (final T record : records) {
+                                logger.trace("Partition {} performing 
Transaction {}: {}", partition, transactionId, record);
+                            }
+                        }
+
+                        try {
+                            partition.update(records, transactionId, 
unmodifiableRecordMap, forceSync);
+                        } catch (final Throwable t) {
+                            partition.blackList();
+                            numberBlackListedPartitions.incrementAndGet();
+                            throw t;
+                        }
+
+                        if (forceSync && syncListener != null) {
+                            syncListener.onSync(resolvedIdx);
+                        }
+                    } finally {
+                        partition.releaseClaim();
+                    }
+
+                    for (final T record : records) {
+                        final UpdateType updateType = 
serdeFactory.getUpdateType(record);
+                        final Object recordIdentifier = 
serdeFactory.getRecordIdentifier(record);
+
+                        if (updateType == UpdateType.DELETE) {
+                            recordMap.remove(recordIdentifier);
+                        } else if (updateType == UpdateType.SWAP_OUT) {
+                            final String newLocation = 
serdeFactory.getLocation(record);
+                            if (newLocation == null) {
+                                logger.error("Received Record (ID=" + 
recordIdentifier + ") with UpdateType of SWAP_OUT but "
+                                        + "no indicator of where the Record is 
to be Swapped Out to; these records may be "
+                                        + "lost when the repository is 
restored!");
+                            } else {
+                                recordMap.remove(recordIdentifier);
+                                this.externalLocations.add(newLocation);
+                            }
+                        } else if (updateType == UpdateType.SWAP_IN) {
+                            final String newLocation = 
serdeFactory.getLocation(record);
+                            if (newLocation == null) {
+                                logger.error("Received Record (ID=" + 
recordIdentifier + ") with UpdateType of SWAP_IN but no "
+                                        + "indicator of where the Record is to 
be Swapped In from; these records may be duplicated "
+                                        + "when the repository is restored!");
+                            } else {
+                                externalLocations.remove(newLocation);
+                            }
+                            recordMap.put(recordIdentifier, record);
+                        } else {
+                            recordMap.put(recordIdentifier, record);
+                        }
+                    }
+
+                    return resolvedIdx;
+                }
+            }
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public Collection<T> recoverRecords() throws IOException {
+        if (updated) {
+            throw new IllegalStateException("Cannot recover records after 
updating the repository; must call recoverRecords first");
+        }
+
+        final long recoverStart = System.nanoTime();
+        writeLock.lock();
+        try {
+            Long maxTransactionId = recoverFromSnapshot(recordMap);
+            recoverFromEdits(recordMap, maxTransactionId);
+
+            for (final Partition<T> partition : partitions) {
+                final long transId = partition.getMaxRecoveredTransactionId();
+                if (maxTransactionId == null || transId > maxTransactionId) {
+                    maxTransactionId = transId;
+                }
+            }
+
+            this.transactionIdGenerator.set(maxTransactionId + 1);
+            this.externalLocations.addAll(recoveredExternalLocations);
+            logger.info("{} finished recovering records. Performing Checkpoint 
to ensure proper state of Partitions before updates", this);
+        } finally {
+            writeLock.unlock();
+        }
+        final long recoverNanos = System.nanoTime() - recoverStart;
+        final long recoveryMillis = 
TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS);
+        logger.info("Successfully recovered {} records in {} milliseconds", 
recordMap.size(), recoveryMillis);
+        checkpoint();
+
+        recovered = true;
+        return recordMap.values();
+    }
+
+    @Override
+    public Set<String> getRecoveredSwapLocations() throws IOException {
+        return recoveredExternalLocations;
+    }
+
+    private Long recoverFromSnapshot(final Map<Object, T> recordMap) throws 
IOException {
+        final boolean partialExists = Files.exists(partialPath);
+        final boolean snapshotExists = Files.exists(snapshotPath);
+
+        if (!partialExists && !snapshotExists) {
+            return null;
+        }
+
+        if (partialExists && snapshotExists) {
+            // both files exist -- assume we failed while checkpointing. Delete
+            // the partial file
+            Files.delete(partialPath);
+        } else if (partialExists) {
+            // partial exists but snapshot does not -- we must have completed
+            // creating the partial, deleted the snapshot
+            // but crashed before renaming the partial to the snapshot. Just
+            // rename partial to snapshot
+            Files.move(partialPath, snapshotPath);
+        }
+
+        if (Files.size(snapshotPath) == 0) {
+            logger.warn("{} Found 0-byte Snapshot file; skipping Snapshot file 
in recovery", this);
+            return null;
+        }
+
+        // at this point, we know the snapshotPath exists because if it 
didn't, then we either returned null
+        // or we renamed partialPath to snapshotPath. So just Recover from 
snapshotPath.
+        try (final DataInputStream dataIn = new DataInputStream(new 
BufferedInputStream(Files.newInputStream(snapshotPath, 
StandardOpenOption.READ)))) {
+            final String waliImplementationClass = dataIn.readUTF();
+            final int waliImplementationVersion = dataIn.readInt();
+
+            if 
(!waliImplementationClass.equals(MinimalLockingWriteAheadLog.class.getName())) {
+                throw new IOException("Write-Ahead Log located at " + 
snapshotPath + " was written using the "
+                        + waliImplementationClass + " class; cannot restore 
using " + getClass().getName());
+            }
+
+            if (waliImplementationVersion > getVersion()) {
+                throw new IOException("Write-Ahead Log located at " + 
snapshotPath + " was written using version "
+                        + waliImplementationVersion + " of the " + 
waliImplementationClass + " class; cannot restore using Version " + 
getVersion());
+            }
+
+            final String serdeEncoding = dataIn.readUTF(); // ignore serde 
class name for now
+            final int serdeVersion = dataIn.readInt();
+            final long maxTransactionId = dataIn.readLong();
+            final int numRecords = dataIn.readInt();
+
+            final SerDe<T> serde = serdeFactory.createSerDe(serdeEncoding);
+            serde.readHeader(dataIn);
+
+            for (int i = 0; i < numRecords; i++) {
+                final T record = serde.deserializeRecord(dataIn, serdeVersion);
+                if (record == null) {
+                    throw new EOFException();
+                }
+
+                final UpdateType updateType = serde.getUpdateType(record);
+                if (updateType == UpdateType.DELETE) {
+                    logger.warn("While recovering from snapshot, found record 
with type 'DELETE'; this record will not be restored");
+                    continue;
+                }
+
+                logger.trace("Recovered from snapshot: {}", record);
+                recordMap.put(serde.getRecordIdentifier(record), record);
+            }
+
+            final int numSwapRecords = dataIn.readInt();
+            final Set<String> swapLocations = new HashSet<>();
+            for (int i = 0; i < numSwapRecords; i++) {
+                swapLocations.add(dataIn.readUTF());
+            }
+            this.recoveredExternalLocations.addAll(swapLocations);
+
+            logger.debug("{} restored {} Records and {} Swap Files from 
Snapshot, ending with Transaction ID {}", this, numRecords, 
recoveredExternalLocations.size(), maxTransactionId);
+            return maxTransactionId;
+        }
+    }
+
+    /**
+     * Recovers records from the edit logs via the Partitions. Returns a 
boolean
+     * if recovery of a Partition requires the Write-Ahead Log be checkpointed
+     * before modification.
+     *
+     * @param modifiableRecordMap map
+     * @param maxTransactionIdRestored index of max restored transaction
+     * @throws IOException if unable to recover from edits
+     */
+    private void recoverFromEdits(final Map<Object, T> modifiableRecordMap, 
final Long maxTransactionIdRestored) throws IOException {
+        final Map<Object, T> updateMap = new HashMap<>();
+        final Map<Object, T> unmodifiableRecordMap = 
Collections.unmodifiableMap(modifiableRecordMap);
+        final Map<Object, T> ignorableMap = new HashMap<>();
+        final Set<String> ignorableSwapLocations = new HashSet<>();
+
+        // populate a map of the next transaction id for each partition to the
+        // partition that has that next transaction id.
+        final SortedMap<Long, Partition<T>> transactionMap = new TreeMap<>();
+        for (final Partition<T> partition : partitions) {
+            Long transactionId;
+            boolean keepTransaction;
+            do {
+                transactionId = partition.getNextRecoverableTransactionId();
+
+                keepTransaction = transactionId == null || 
maxTransactionIdRestored == null || transactionId > maxTransactionIdRestored;
+                if (keepTransaction && transactionId != null) {
+                    // map this transaction id to its partition so that we can
+                    // start restoring transactions from this partition,
+                    // starting at 'transactionId'
+                    transactionMap.put(transactionId, partition);
+                } else if (transactionId != null) {
+                    // skip the next transaction, because our snapshot already
+                    // contained this transaction.
+                    try {
+                        partition.recoverNextTransaction(ignorableMap, 
updateMap, ignorableSwapLocations);
+                    } catch (final EOFException e) {
+                        logger.error("{} unexpectedly reached End of File 
while reading from {} for Transaction {}; assuming crash and ignoring this 
transaction.", this, partition, transactionId);
+                    }
+                }
+            } while (!keepTransaction);
+        }
+
+        while (!transactionMap.isEmpty()) {
+            final Map.Entry<Long, Partition<T>> firstEntry = 
transactionMap.entrySet().iterator().next();
+            final Long firstTransactionId = firstEntry.getKey();
+            final Partition<T> nextPartition = firstEntry.getValue();
+
+            try {
+                updateMap.clear();
+                final Set<Object> idsRemoved = 
nextPartition.recoverNextTransaction(unmodifiableRecordMap, updateMap, 
recoveredExternalLocations);
+                modifiableRecordMap.putAll(updateMap);
+                for (final Object id : idsRemoved) {
+                    modifiableRecordMap.remove(id);
+                }
+            } catch (final EOFException e) {
+                logger.error("{} unexpectedly reached End-of-File when reading 
from {} for Transaction ID {}; assuming crash and ignoring this transaction", 
this, nextPartition, firstTransactionId);
+            }
+
+            transactionMap.remove(firstTransactionId);
+
+            Long subsequentTransactionId = null;
+            try {
+                subsequentTransactionId = 
nextPartition.getNextRecoverableTransactionId();
+            } catch (final IOException e) {
+                logger.error("{} unexpectedly found End-of-File when reading 
from {} for Transaction ID {}; assuming crash and ignoring this transaction", 
this, nextPartition, firstTransactionId);
+            }
+
+            if (subsequentTransactionId != null) {
+                transactionMap.put(subsequentTransactionId, nextPartition);
+            }
+        }
+
+        for (final Partition<T> partition : partitions) {
+            partition.endRecovery();
+        }
+    }
+
+    @Override
+    public synchronized int checkpoint() throws IOException {
+        final Set<T> records;
+        final Set<String> swapLocations;
+        final long maxTransactionId;
+
+        final long startNanos = System.nanoTime();
+
+        FileOutputStream fileOut = null;
+        DataOutputStream dataOut = null;
+
+        long stopTheWorldNanos = -1L;
+        long stopTheWorldStart = -1L;
+        try {
+            final List<OutputStream> partitionStreams = new ArrayList<>();
+
+            writeLock.lock();
+            try {
+                stopTheWorldStart = System.nanoTime();
+                // stop the world while we make a copy of the records that must
+                // be checkpointed and rollover the partitions.
+                // We copy the records because serializing them is potentially
+                // very expensive, especially when we have hundreds
+                // of thousands or even millions of them. We don't want to
+                // prevent WALI from being used during this time.
+
+                // So the design is to copy all of the records, determine the
+                // last transaction ID that the records represent,
+                // and roll over the partitions to new write-ahead logs.
+                // Then, outside of the write lock, we will serialize the data
+                // to disk, and then remove the old Partition data.
+                records = new HashSet<>(recordMap.values());
+                maxTransactionId = transactionIdGenerator.get() - 1;
+
+                swapLocations = new HashSet<>(externalLocations);
+                for (final Partition<T> partition : partitions) {
+                    try {
+                        partitionStreams.add(partition.rollover());
+                    } catch (final Throwable t) {
+                        partition.blackList();
+                        numberBlackListedPartitions.getAndIncrement();
+                        throw t;
+                    }
+                }
+            } finally {
+                writeLock.unlock();
+            }
+
+            stopTheWorldNanos = System.nanoTime() - stopTheWorldStart;
+
+            // Close all of the Partitions' Output Streams. We do this here, 
instead of in Partition.rollover()
+            // because we want to do this outside of the write lock. Because 
calling close() on FileOutputStream can
+            // be very expensive, as it has to flush the data to disk, we 
don't want to prevent other Process Sessions
+            // from getting committed. Since rollover() transitions the 
partition to write to a new file already, there
+            // is no reason that we need to close this FileOutputStream before 
releasing the write lock. Also, if any Exception
+            // does get thrown when calling close(), we don't need to 
blacklist the partition, as the stream that was getting
+            // closed is not the stream being written to for the partition 
anyway. We also catch any IOException and wait until
+            // after we've attempted to close all streams before we throw an 
Exception, to avoid resource leaks if one of them
+            // is unable to be closed (due to out of storage space, for 
instance).
+            IOException failure = null;
+            for (final OutputStream partitionStream : partitionStreams) {
+                try {
+                    partitionStream.close();
+                } catch (final IOException e) {
+                    failure = e;
+                }
+            }
+            if (failure != null) {
+                throw failure;
+            }
+
+            // notify global sync with the write lock held. We do this because 
we don't want the repository to get updated
+            // while the listener is performing its necessary tasks
+            if (syncListener != null) {
+                syncListener.onGlobalSync();
+            }
+
+            final SerDe<T> serde = serdeFactory.createSerDe(null);
+
+            // perform checkpoint, writing to .partial file
+            fileOut = new FileOutputStream(partialPath.toFile());
+            dataOut = new DataOutputStream(new BufferedOutputStream(fileOut));
+            dataOut.writeUTF(MinimalLockingWriteAheadLog.class.getName());
+            dataOut.writeInt(getVersion());
+            dataOut.writeUTF(serde.getClass().getName());
+            dataOut.writeInt(serde.getVersion());
+            dataOut.writeLong(maxTransactionId);
+            dataOut.writeInt(records.size());
+            serde.writeHeader(dataOut);
+
+            for (final T record : records) {
+                logger.trace("Checkpointing {}", record);
+                serde.serializeRecord(record, dataOut);
+            }
+
+            dataOut.writeInt(swapLocations.size());
+            for (final String swapLocation : swapLocations) {
+                dataOut.writeUTF(swapLocation);
+            }
+        } finally {
+            if (dataOut != null) {
+                try {
+                    try {
+                        dataOut.flush();
+                        fileOut.getFD().sync();
+                    } finally {
+                        dataOut.close();
+                    }
+                } catch (final IOException e) {
+                    logger.warn("Failed to close Data Stream", e);
+                }
+            }
+        }
+
+        // delete the snapshot, if it exists, and rename the .partial to
+        // snapshot
+        Files.deleteIfExists(snapshotPath);
+        Files.move(partialPath, snapshotPath);
+
+        // clear all of the edit logs
+        final long partitionStart = System.nanoTime();
+        for (final Partition<T> partition : partitions) {
+            // we can call clearOld without claiming the partition because it
+            // does not change the partition's state
+            // and the only member variable it touches cannot be modified, 
other
+            // than when #rollover() is called.
+            // And since this method is the only one that calls #rollover() and
+            // this method is synchronized,
+            // the value of that member variable will not change. And it's
+            // volatile, so we will get the correct value.
+            partition.clearOld();
+        }
+        final long partitionEnd = System.nanoTime();
+        numberBlackListedPartitions.set(0);
+
+        final long endNanos = System.nanoTime();
+        final long millis = TimeUnit.MILLISECONDS.convert(endNanos - 
startNanos, TimeUnit.NANOSECONDS);
+        final long partitionMillis = 
TimeUnit.MILLISECONDS.convert(partitionEnd - partitionStart, 
TimeUnit.NANOSECONDS);
+        final long stopTheWorldMillis = 
TimeUnit.NANOSECONDS.toMillis(stopTheWorldNanos);
+
+        logger.info("{} checkpointed with {} Records and {} Swap Files in {} 
milliseconds (Stop-the-world time = {} milliseconds, Clear Edit Logs time = {} 
millis), max Transaction ID {}",
+                this, records.size(), swapLocations.size(), millis, 
stopTheWorldMillis, partitionMillis, maxTransactionId);
+
+        return records.size();
+    }
+
+    @Override
+    public void shutdown() throws IOException {
+        writeLock.lock();
+        try {
+            for (final Partition<T> partition : partitions) {
+                partition.close();
+            }
+        } finally {
+            writeLock.unlock();
+            lockChannel.close();
+
+            final File lockFile = new File(basePath.toFile(), "wali.lock");
+            lockFile.delete();
+        }
+    }
+
+    public int getVersion() {
+        return 1;
+    }
+
+    /**
+     * Represents a partition of this repository, which maps directly to a
+     * .journal file.
+     *
+     * All methods with the exceptions of {@link #claim()}, {@link 
#tryClaim()},
+     * and {@link #releaseClaim()} in this Partition MUST be called while
+     * holding the claim (via {@link #claim} or {@link #tryClaim()}).
+     *
+     * @param <S> type of record held in the partitions
+     */
+    private static class Partition<S> {
+        public static final String JOURNAL_EXTENSION = ".journal";
+        private static final int NUL_BYTE = 0;
+        private static final Pattern JOURNAL_FILENAME_PATTERN = 
Pattern.compile("\\d+\\.journal");
+
+        private final SerDeFactory<S> serdeFactory;
+        private SerDe<S> serde;
+
+        private final Path editDirectory;
+        private final int writeAheadLogVersion;
+
+        private DataOutputStream dataOut = null;
+        private FileOutputStream fileOut = null;
+        private volatile boolean blackListed = false;
+        private volatile boolean closed = false;
+        private DataInputStream recoveryIn;
+        private int recoveryVersion;
+        private String currentJournalFilename = "";
+
+        private static final byte TRANSACTION_CONTINUE = 1;
+        private static final byte TRANSACTION_COMMIT = 2;
+
+        private final String description;
+        private final AtomicLong maxTransactionId = new AtomicLong(-1L);
+        private final Logger logger = 
LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class);
+
+        private final Queue<Path> recoveryFiles;
+
+        public Partition(final Path path, final SerDeFactory<S> serdeFactory, 
final int partitionIndex, final int writeAheadLogVersion) throws IOException {
+            this.editDirectory = path;
+            this.serdeFactory = serdeFactory;
+
+            final File file = path.toFile();
+            if (!file.exists() && !file.mkdirs()) {
+                throw new IOException("Could not create directory " + 
file.getAbsolutePath());
+            }
+
+            this.recoveryFiles = new LinkedBlockingQueue<>();
+            for (final Path recoveryPath : getRecoveryPaths()) {
+                recoveryFiles.add(recoveryPath);
+            }
+
+            this.description = "Partition-" + partitionIndex;
+            this.writeAheadLogVersion = writeAheadLogVersion;
+        }
+
+        public boolean tryClaim() {
+            return !blackListed;
+        }
+
+        public void releaseClaim() {
+        }
+
+        public void close() {
+            this.closed = true;
+
+            // Note that here we are closing fileOut and NOT dataOut.
+            // This is very much intentional, not an oversight. This is done 
because of
+            // the way that the OutputStreams are structured. dataOut wraps a 
BufferedOutputStream,
+            // which then wraps the FileOutputStream. If we close 'dataOut', 
then this will call
+            // the flush() method of BufferedOutputStream. Under normal 
conditions, this is fine.
+            // However, there is a very important corner case to consider:
+            //
+            //      If we are writing to the DataOutputStream in the update() 
method and that
+            //      call to write() then results in the BufferedOutputStream 
calling flushBuffer() -
+            //      or if we finish the call to update() and call flush() 
ourselves - it is possible
+            //      that the internal buffer of the BufferedOutputStream can 
get partially written to
+            //      to the FileOutputStream and then an IOException occurs. If 
this occurs, we have
+            //      written a partial record to disk. This still is okay, as 
we have logic to handle
+            //      the condition where we have a partial record and then an 
unexpected End-of-File.
+            //      But if we then call close() on 'dataOut', this will call 
the flush() method of the
+            //      underlying BufferedOutputStream. As a result, we will end 
up again writing the internal
+            //      buffer of the BufferedOutputStream to the underlying file. 
At this point, we are left
+            //      not with an unexpected/premature End-of-File but instead a 
bunch of seemingly random
+            //      bytes that happened to be residing in that internal 
buffer, and this will result in
+            //      a corrupt and unrecoverable Write-Ahead Log.
+            //
+            // Additionally, we are okay not ever calling close on the 
wrapping BufferedOutputStream and
+            // DataOutputStream because they don't actually hold any resources 
that need to be reclaimed,
+            // and after each update to the Write-Ahead Log, we call flush() 
ourselves to ensure that we don't
+            // leave arbitrary data in the BufferedOutputStream that hasn't 
been flushed to the underlying
+            // FileOutputStream.
+            final OutputStream out = fileOut;
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (final Exception e) {
+                }
+            }
+
+            this.dataOut = null;
+            this.fileOut = null;
+        }
+
+        public void blackList() {
+            blackListed = true;
+            logger.debug("Blacklisted {}", this);
+        }
+
+        /**
+         * Closes resources pointing to the current journal and begins writing
+         * to a new one
+         *
+         * @throws IOException if failure to rollover
+         */
+        public OutputStream rollover() throws IOException {
+            // Note that here we are closing fileOut and NOT dataOut. See the 
note in the close()
+            // method to understand the logic behind this.
+            final OutputStream oldOutputStream = fileOut;
+            dataOut = null;
+            fileOut = null;
+
+            this.serde = serdeFactory.createSerDe(null);
+            final Path editPath = getNewEditPath();
+            final FileOutputStream fos = new 
FileOutputStream(editPath.toFile());
+            try {
+                final DataOutputStream outStream = new DataOutputStream(new 
BufferedOutputStream(fos));
+                
outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
+                outStream.writeInt(writeAheadLogVersion);
+                outStream.writeUTF(serde.getClass().getName());
+                outStream.writeInt(serde.getVersion());
+                serde.writeHeader(outStream);
+
+                outStream.flush();
+                dataOut = outStream;
+                fileOut = fos;
+            } catch (final IOException ioe) {
+                try {
+                    oldOutputStream.close();
+                } catch (final IOException ioe2) {
+                    ioe.addSuppressed(ioe2);
+                }
+
+                logger.error("Failed to create new journal for {}", this, ioe);
+                try {
+                    fos.close();
+                } catch (final IOException innerIOE) {
+                }
+
+                dataOut = null;
+                fileOut = null;
+                blackList();
+
+                throw ioe;
+            }
+
+            currentJournalFilename = editPath.toFile().getName();
+
+            blackListed = false;
+            return oldOutputStream;
+        }
+
+        private long getJournalIndex(final File file) {
+            final String filename = file.getName();
+            final int dotIndex = filename.indexOf(".");
+            final String number = filename.substring(0, dotIndex);
+            return Long.parseLong(number);
+        }
+
+        private Path getNewEditPath() {
+            final List<Path> recoveryPaths = getRecoveryPaths();
+            final long newIndex;
+            if (recoveryPaths == null || recoveryPaths.isEmpty()) {
+                newIndex = 1;
+            } else {
+                final long lastFileIndex = 
getJournalIndex(recoveryPaths.get(recoveryPaths.size() - 1).toFile());
+                newIndex = lastFileIndex + 1;
+            }
+
+            return editDirectory.resolve(newIndex + JOURNAL_EXTENSION);
+        }
+
+        private List<Path> getRecoveryPaths() {
+            final List<Path> paths = new ArrayList<>();
+
+            final File directory = editDirectory.toFile();
+            final File[] partitionFiles = directory.listFiles();
+            if (partitionFiles == null) {
+                return paths;
+            }
+
+            for (final File file : partitionFiles) {
+                // if file is a journal file but no data has yet been 
persisted, it may
+                // very well be a 0-byte file (the journal is not SYNC'ed to 
disk after
+                // a header is written out, so it may be lost). In this case, 
the journal
+                // is empty, so we can just skip it.
+                if (file.isDirectory() || file.length() == 0L) {
+                    continue;
+                }
+
+                if 
(!JOURNAL_FILENAME_PATTERN.matcher(file.getName()).matches()) {
+                    continue;
+                }
+
+                if (isJournalFile(file)) {
+                    paths.add(file.toPath());
+                } else {
+                    logger.warn("Found file {}, but could not access it, or it 
was not in the expected format; "
+                            + "will ignore this file", file.getAbsolutePath());
+                }
+            }
+
+            // Sort journal files by the numeric portion of the filename
+            Collections.sort(paths, new Comparator<Path>() {
+                @Override
+                public int compare(final Path o1, final Path o2) {
+                    if (o1 == null && o2 == null) {
+                        return 0;
+                    }
+                    if (o1 == null) {
+                        return 1;
+                    }
+                    if (o2 == null) {
+                        return -1;
+                    }
+
+                    final long index1 = getJournalIndex(o1.toFile());
+                    final long index2 = getJournalIndex(o2.toFile());
+                    return Long.compare(index1, index2);
+                }
+            });
+
+            return paths;
+        }
+
+        void clearOld() {
+            final List<Path> oldRecoveryFiles = getRecoveryPaths();
+
+            for (final Path path : oldRecoveryFiles) {
+                final File file = path.toFile();
+                if (file.getName().equals(currentJournalFilename)) {
+                    continue;
+                }
+                if (file.exists()) {
+                    file.delete();
+                }
+            }
+        }
+
+        private boolean isJournalFile(final File file) {
+            final String expectedStartsWith = 
MinimalLockingWriteAheadLog.class.getName();
+            try {
+                try (final FileInputStream fis = new FileInputStream(file);
+                        final InputStream bufferedIn = new 
BufferedInputStream(fis);
+                        final DataInputStream in = new 
DataInputStream(bufferedIn)) {
+                    final String waliImplClassName = in.readUTF();
+                    if (!expectedStartsWith.equals(waliImplClassName)) {
+                        return false;
+                    }
+                }
+            } catch (final IOException e) {
+                return false;
+            }
+
+            return true;
+        }
+
+        public void update(final Collection<S> records, final long 
transactionId, final Map<Object, S> recordMap, final boolean forceSync) throws 
IOException {
+            try (final ByteArrayOutputStream baos = new 
ByteArrayOutputStream(256);
+                final DataOutputStream out = new DataOutputStream(baos)) {
+
+                out.writeLong(transactionId);
+                final int numEditsToSerialize = records.size();
+                int editsSerialized = 0;
+                for (final S record : records) {
+                    final Object recordId = serde.getRecordIdentifier(record);
+                    final S previousVersion = recordMap.get(recordId);
+
+                    serde.serializeEdit(previousVersion, record, out);
+                    if (++editsSerialized < numEditsToSerialize) {
+                        out.write(TRANSACTION_CONTINUE);
+                    } else {
+                        out.write(TRANSACTION_COMMIT);
+                    }
+                }
+
+                out.flush();
+
+                if (this.closed) {
+                    throw new IllegalStateException("Partition is closed");
+                }
+
+                baos.writeTo(dataOut);
+                dataOut.flush();
+
+                if (forceSync) {
+                    synchronized (fileOut) {
+                        fileOut.getFD().sync();
+                    }
+                }
+            }
+        }
+
+        private DataInputStream createDataInputStream(final Path path) throws 
IOException {
+            return new DataInputStream(new 
BufferedInputStream(Files.newInputStream(path)));
+        }
+
+        private DataInputStream getRecoveryStream() throws IOException {
+            if (recoveryIn != null && hasMoreData(recoveryIn)) {
+                return recoveryIn;
+            }
+
+            while (true) {
+                final Path nextRecoveryPath = recoveryFiles.poll();
+                if (nextRecoveryPath == null) {
+                    return null;
+                }
+
+                logger.debug("{} recovering from {}", this, nextRecoveryPath);
+                recoveryIn = createDataInputStream(nextRecoveryPath);
+                if (hasMoreData(recoveryIn)) {
+                    try {
+                        final String waliImplementationClass = 
recoveryIn.readUTF();
+                        if 
(!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) {
+                            continue;
+                        }
+
+                        final long waliVersion = recoveryIn.readInt();
+                        if (waliVersion > writeAheadLogVersion) {
+                            throw new IOException("Cannot recovery from file " 
+ nextRecoveryPath + " because it was written using "
+                                + "WALI version " + waliVersion + ", but the 
version used to restore it is only " + writeAheadLogVersion);
+                        }
+
+                        final String serdeEncoding = recoveryIn.readUTF();
+                        this.recoveryVersion = recoveryIn.readInt();
+                        serde = serdeFactory.createSerDe(serdeEncoding);
+
+                        serde.readHeader(recoveryIn);
+                        break;
+                    } catch (final Exception e) {
+                        logger.warn("Failed to recover data from Write-Ahead 
Log for {} because the header information could not be read properly. "
+                            + "This often is the result of the file not being 
fully written out before the application is restarted. This file will be 
ignored.", nextRecoveryPath);
+                    }
+                }
+            }
+
+            return recoveryIn;
+        }
+
+        public Long getNextRecoverableTransactionId() throws IOException {
+            while (true) {
+                DataInputStream recoveryStream = getRecoveryStream();
+                if (recoveryStream == null) {
+                    return null;
+                }
+
+                final long transactionId;
+                try {
+                    transactionId = recoveryIn.readLong();
+                } catch (final EOFException e) {
+                    continue;
+                } catch (final Exception e) {
+                    // If the stream consists solely of NUL bytes, then we 
want to treat it
+                    // the same as an EOF because we see this happen when we 
suddenly lose power
+                    // while writing to a file.
+                    if (remainingBytesAllNul(recoveryIn)) {
+                        logger.warn("Failed to recover data from Write-Ahead 
Log Partition because encountered trailing NUL bytes. "
+                            + "This will sometimes happen after a sudden power 
loss. The rest of this journal file will be skipped for recovery purposes.");
+                        continue;
+                    } else {
+                        throw e;
+                    }
+                }
+
+                this.maxTransactionId.set(transactionId);
+                return transactionId;
+            }
+        }
+
+        /**
+         * In the case of a sudden power loss, it is common - at least in a 
Linux journaling File System -
+         * that the partition file that is being written to will have many 
trailing "NUL bytes" (0's).
+         * If this happens, then on restart we want to treat this as an 
incomplete transaction, so we detect
+         * this case explicitly.
+         *
+         * @param in the input stream to scan
+         * @return <code>true</code> if the InputStream contains no data or 
contains only NUL bytes
+         * @throws IOException if unable to read from the given InputStream
+         */
+        private boolean remainingBytesAllNul(final InputStream in) throws 
IOException {
+            int nextByte;
+            while ((nextByte = in.read()) != -1) {
+                if (nextByte != NUL_BYTE) {
+                    return false;
+                }
+            }
+
+            return true;
+        }
+
+        private boolean hasMoreData(final InputStream in) throws IOException {
+            in.mark(1);
+            final int nextByte = in.read();
+            in.reset();
+            return nextByte >= 0;
+        }
+
+        public void endRecovery() throws IOException {
+            if (recoveryIn != null) {
+                recoveryIn.close();
+            }
+
+            final Path nextRecoveryPath = this.recoveryFiles.poll();
+            if (nextRecoveryPath != null) {
+                throw new IllegalStateException("Signaled to end recovery, but 
there are more recovery files for Partition "
+                        + "in directory " + editDirectory);
+            }
+
+            final Path newEditPath = getNewEditPath();
+
+            this.serde = serdeFactory.createSerDe(null);
+            final FileOutputStream fos = new 
FileOutputStream(newEditPath.toFile());
+            final DataOutputStream outStream = new DataOutputStream(new 
BufferedOutputStream(fos));
+            outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
+            outStream.writeInt(writeAheadLogVersion);
+            outStream.writeUTF(serde.getClass().getName());
+            outStream.writeInt(serde.getVersion());
+            serde.writeHeader(outStream);
+
+            outStream.flush();
+            dataOut = outStream;
+            fileOut = fos;
+        }
+
+        public Set<Object> recoverNextTransaction(final Map<Object, S> 
currentRecordMap, final Map<Object, S> updatedRecordMap, final Set<String> 
swapLocations) throws IOException {
+            final Set<Object> idsRemoved = new HashSet<>();
+
+            int transactionFlag;
+            do {
+                final S record;
+                try {
+                    record = serde.deserializeEdit(recoveryIn, 
currentRecordMap, recoveryVersion);
+                    if (record == null) {
+                        throw new EOFException();
+                    }
+                } catch (final EOFException eof) {
+                    throw eof;
+                } catch (final Exception e) {
+                    // If the stream consists solely of NUL bytes, then we 
want to treat it
+                    // the same as an EOF because we see this happen when we 
suddenly lose power
+                    // while writing to a file. We also have logic already in 
the caller of this
+                    // method to properly handle EOFException's, so we will 
simply throw an EOFException
+                    // ourselves. However, if that is not the case, then 
something else has gone wrong.
+                    // In such a case, there is not much that we can do. If we 
simply skip over the transaction,
+                    // then the transaction may be indicating that a new 
attribute was added or changed. Or the
+                    // content of the FlowFile changed. A subsequent 
transaction for the same FlowFile may then
+                    // update the connection that is holding the FlowFile. In 
this case, if we simply skip over
+                    // the transaction, we end up with a FlowFile in a queue 
that has the wrong attributes or
+                    // content, and that can result in some very bad behavior 
- even security vulnerabilities if
+                    // a Route processor, for instance, routes incorrectly due 
to a missing attribute or content
+                    // is pointing to a previous claim where sensitive values 
have not been removed, etc. So
+                    // instead of attempting to skip the transaction and move 
on, we instead just throw the Exception
+                    // indicating that the write-ahead log is corrupt and 
allow the user to handle it as he/she sees
+                    // fit (likely this will result in deleting the repo, but 
it's possible that it could be repaired
+                    // manually or through some sort of script).
+                    if (remainingBytesAllNul(recoveryIn)) {
+                        final EOFException eof = new EOFException("Failed to 
recover data from Write-Ahead Log Partition because encountered trailing NUL 
bytes. "
+                            + "This will sometimes happen after a sudden power 
loss. The rest of this journal file will be skipped for recovery purposes.");
+                        eof.addSuppressed(e);
+                        throw eof;
+                    } else {
+                        throw e;
+                    }
+                }
+
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("{} Recovering Transaction {}: {}", this, 
maxTransactionId.get(), record);
+                }
+
+                final Object recordId = serde.getRecordIdentifier(record);
+                final UpdateType updateType = serde.getUpdateType(record);
+                if (updateType == UpdateType.DELETE) {
+                    updatedRecordMap.remove(recordId);
+                    idsRemoved.add(recordId);
+                } else if (updateType == UpdateType.SWAP_IN) {
+                    final String location = serde.getLocation(record);
+                    if (location == null) {
+                        logger.error("Recovered SWAP_IN record from edit log, 
but it did not contain a Location; skipping record");
+                    } else {
+                        swapLocations.remove(location);
+                        updatedRecordMap.put(recordId, record);
+                        idsRemoved.remove(recordId);
+                    }
+                } else if (updateType == UpdateType.SWAP_OUT) {
+                    final String location = serde.getLocation(record);
+                    if (location == null) {
+                        logger.error("Recovered SWAP_OUT record from edit log, 
but it did not contain a Location; skipping record");
+                    } else {
+                        swapLocations.add(location);
+                        updatedRecordMap.remove(recordId);
+                        idsRemoved.add(recordId);
+                    }
+                } else {
+                    updatedRecordMap.put(recordId, record);
+                    idsRemoved.remove(recordId);
+                }
+
+                transactionFlag = recoveryIn.read();
+            } while (transactionFlag != TRANSACTION_COMMIT);
+
+            return idsRemoved;
+        }
+
+        /**
+         * Must be called after recovery has finished
+         *
+         * @return max recovered transaction id
+         */
+        public long getMaxRecoveredTransactionId() {
+            return maxTransactionId.get();
+        }
+
+        @Override
+        public String toString() {
+            return description;
+        }
+    }
+}
 {code}

> Local state lost when migrating from 1.28 to 2.3
> ------------------------------------------------
>
>                 Key: NIFI-14392
>                 URL: https://issues.apache.org/jira/browse/NIFI-14392
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 2.0.0-M1, 2.0.0-M2, 2.0.0-M3, 2.0.0-M4, 2.0.0, 2.1.0, 
> 2.2.0, 2.3.0
>            Reporter: Ruben Van Wanzeele
>            Priority: Major
>
> We experienced an issue with the state during migration
> Our local state-management configuration looks as follows:
> {code:java}
> <local-provider>
>   <id>local-provider</id>
>   
> <class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
>   <property name="Directory">./state/local</property>
>   <property name="Always Sync">false</property>
>   <property name="Partitions">16</property>
>   <property name="Checkpoint Interval">2 mins</property>
> </local-provider> {code}
> And before migration we have our state details correctly in the expected 
> directory:
> {code:java}
> ls -la ./state/local/
> total 92
> drwxrwsr-x. 18 nifi nifi  4096 Mar 26 10:25 .
> drwxrwsr-x.  8 root nifi  4096 Jan 17 02:14 ..
> drwxrwsr-x.  2 nifi nifi  4096 Mar 26 10:25 partition-0
> drwxrwsr-x.  2 nifi nifi  4096 Mar 26 10:25 partition-1
> drwxrwsr-x.  2 nifi nifi  4096 Mar 26 10:25 partition-10
> drwxrwsr-x.  2 nifi nifi  4096 Mar 26 10:25 partition-11
> ... 
> drwxrwsr-x.  2 nifi nifi  4096 Mar 26 10:25 partition-8
> drwxrwsr-x.  2 nifi nifi  4096 Mar 26 10:25 partition-9
> -rw-r--r--.  1 nifi nifi 20448 Mar 26 10:25 snapshot
> -rw-r--r--.  1 nifi nifi     0 Feb 19 21:28 wali.lock{code}
> On a new environment with the same state-management configuration I see that 
> the state information is stored in ./state/local/journals directory:
> {code:java}
> ls -la ./state/local/
> total 16
> drwxrwsr-x 3 nifi nifi 4096 Mar 26 10:23 .
> drwxrwsr-x 9 root nifi 4096 Mar 24 12:27 ..
> -rw-r--r-- 1 nifi nifi 2443 Mar 26 10:23 checkpoint
> drwxrwsr-x 2 nifi nifi 4096 Mar 26 10:23 journals {code}
> On a migrated environment I find both directory structures, and {*}the state 
> from before 2.3 is not maintained{*}.
> When searching the web, I found that this issue was already alerted, but not 
> yet officially reported:
> https://lists.apache.org/thread/5v59hbp18rpsboqc2q6f79h3b64803om



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to