This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new fd111a22a6 Ensure resources are closed in GarbageCollectWriteAheadLogs
(#4790)
fd111a22a6 is described below
commit fd111a22a6454837ad6076d5ba6fe54fc8934be5
Author: Dom G. <[email protected]>
AuthorDate: Fri Aug 16 15:48:58 2024 -0400
Ensure resources are closed in GarbageCollectWriteAheadLogs (#4790)
* Ensure resources are closed in GarbageCollectWriteAheadLogs
* Adds Stream<TabletLocationState> stream() method to TabletStateStore that
closes the underlying iterator when the stream is closed
---------
Co-authored-by: Christopher Tubbs <[email protected]>
---
.../server/manager/state/TabletStateStore.java | 22 +++++
.../accumulo/gc/GarbageCollectWriteAheadLogs.java | 97 +++++++++++++---------
.../gc/GarbageCollectWriteAheadLogsTest.java | 34 ++++----
3 files changed, 97 insertions(+), 56 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
index 05072fd1b0..1894f7e7c7 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
@@ -18,10 +18,16 @@
*/
package org.apache.accumulo.server.manager.state;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -52,6 +58,22 @@ public interface TabletStateStore extends
Iterable<TabletLocationState> {
@Override
ClosableIterator<TabletLocationState> iterator();
+ /**
+ * Create a stream of TabletLocationState that automatically closes the
underlying iterator.
+ */
+ default Stream<TabletLocationState> stream() {
+ ClosableIterator<TabletLocationState> iterator = this.iterator();
+ return StreamSupport
+ .stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .onClose(() -> {
+ try {
+ iterator.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ }
+
/**
* Store the assigned locations in the data store.
*/
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index ecfaeaba66..0fe4843ad1 100644
---
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -28,6 +28,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -57,8 +59,8 @@ import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterators;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Streams;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
@@ -71,7 +73,8 @@ public class GarbageCollectWriteAheadLogs {
private final boolean useTrash;
private final LiveTServerSet liveServers;
private final WalStateManager walMarker;
- private final Iterable<TabletLocationState> store;
+ private final AtomicBoolean hasCollected = new AtomicBoolean(false);
+ private final Stream<TabletLocationState> store;
/**
* Creates a new GC WAL object.
@@ -82,38 +85,51 @@ public class GarbageCollectWriteAheadLogs {
*/
GarbageCollectWriteAheadLogs(final ServerContext context, final
VolumeManager fs,
final LiveTServerSet liveServers, boolean useTrash) {
- this.context = context;
- this.fs = fs;
- this.useTrash = useTrash;
- this.liveServers = liveServers;
- this.walMarker = new WalStateManager(context);
- this.store = () -> Iterators.concat(
- TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).iterator(),
- TabletStateStore.getStoreForLevel(DataLevel.METADATA,
context).iterator(),
- TabletStateStore.getStoreForLevel(DataLevel.USER, context).iterator());
+ this(context, fs, liveServers, useTrash, new WalStateManager(context),
createStore(context));
}
/**
- * Creates a new GC WAL object. Meant for testing -- allows mocked objects.
+ * Creates a new GC WAL object. Meant for testing -- allows for mocked
objects.
+ *
*
* @param context the collection server's context
* @param fs volume manager to use
+ * @param liveServers a started LiveTServerSet instance
* @param useTrash true to move files to trash rather than delete them
- * @param liveTServerSet a started LiveTServerSet instance
+ * @param walMarker a WalStateManager instance
+ * @param store a stream of TabletLocationState objects
*/
- @VisibleForTesting
- GarbageCollectWriteAheadLogs(ServerContext context, VolumeManager fs,
boolean useTrash,
- LiveTServerSet liveTServerSet, WalStateManager walMarker,
- Iterable<TabletLocationState> store) {
+ GarbageCollectWriteAheadLogs(final ServerContext context, final
VolumeManager fs,
+ final LiveTServerSet liveServers, boolean useTrash, final
WalStateManager walMarker,
+ final Stream<TabletLocationState> store) {
this.context = context;
this.fs = fs;
this.useTrash = useTrash;
- this.liveServers = liveTServerSet;
+ this.liveServers = liveServers;
this.walMarker = walMarker;
this.store = store;
}
+ private static Stream<TabletLocationState> createStore(final ServerContext
context) {
+ var rootStream = TabletStateStore.getStoreForLevel(DataLevel.ROOT,
context).stream();
+ var metadataStream = TabletStateStore.getStoreForLevel(DataLevel.METADATA,
context).stream();
+ var userStream = TabletStateStore.getStoreForLevel(DataLevel.USER,
context).stream();
+ return Streams.concat(rootStream, metadataStream, userStream).onClose(()
-> {
+ try {
+ rootStream.close();
+ } finally {
+ try {
+ metadataStream.close();
+ } finally {
+ userStream.close();
+ }
+ }
+ });
+ }
+
public void collect(GCStatus status) {
+ Preconditions.checkState(hasCollected.compareAndSet(false, true),
+ "collect() has already been called on this object (which should only
be called once)");
try {
long count;
long fileScanStop;
@@ -216,7 +232,6 @@ public class GarbageCollectWriteAheadLogs {
} finally {
span5.end();
}
-
} catch (Exception e) {
log.error("exception occurred while garbage collecting write ahead
logs", e);
} finally {
@@ -302,30 +317,34 @@ public class GarbageCollectWriteAheadLogs {
}
// remove any entries if there's a log reference (recovery hasn't finished)
- for (TabletLocationState state : store) {
- // Tablet is still assigned to a dead server. Manager has moved markers
and reassigned it
- // Easiest to just ignore all the WALs for the dead server.
- if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
- Set<UUID> idsToIgnore =
candidates.remove(state.current.getServerInstance());
- if (idsToIgnore != null) {
- result.keySet().removeAll(idsToIgnore);
- recoveryLogs.keySet().removeAll(idsToIgnore);
- }
- }
- // Tablet is being recovered and has WAL references, remove all the WALs
for the dead server
- // that made the WALs.
- for (Collection<String> wals : state.walogs) {
- for (String wal : wals) {
- UUID walUUID = path2uuid(new Path(wal));
- TServerInstance dead = result.get(walUUID);
- // There's a reference to a log file, so skip that server's logs
- Set<UUID> idsToIgnore = candidates.remove(dead);
+ try {
+ store.forEach(state -> {
+ // Tablet is still assigned to a dead server. Manager has moved
markers and reassigned it
+ // Easiest to just ignore all the WALs for the dead server.
+ if (state.getState(liveServers) ==
TabletState.ASSIGNED_TO_DEAD_SERVER) {
+ Set<UUID> idsToIgnore =
candidates.remove(state.current.getServerInstance());
if (idsToIgnore != null) {
result.keySet().removeAll(idsToIgnore);
recoveryLogs.keySet().removeAll(idsToIgnore);
}
}
- }
+ // Tablet is being recovered and has WAL references, remove all the
WALs for the dead server
+ // that made the WALs.
+ for (Collection<String> wals : state.walogs) {
+ for (String wal : wals) {
+ UUID walUUID = path2uuid(new Path(wal));
+ TServerInstance dead = result.get(walUUID);
+ // There's a reference to a log file, so skip that server's logs
+ Set<UUID> idsToIgnore = candidates.remove(dead);
+ if (idsToIgnore != null) {
+ result.keySet().removeAll(idsToIgnore);
+ recoveryLogs.keySet().removeAll(idsToIgnore);
+ }
+ }
+ }
+ });
+ } finally {
+ store.close();
}
// Remove OPEN and CLOSED logs for live servers: they are still in use
diff --git
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index d9c69da11f..544407b495 100644
---
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
+import java.util.stream.Stream;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
@@ -83,12 +84,11 @@ public class GarbageCollectWriteAheadLogsTest {
}
}
- private final Iterable<TabletLocationState> tabletOnServer1List =
- Collections.singletonList(tabletAssignedToServer1);
- private final Iterable<TabletLocationState> tabletOnServer2List =
- Collections.singletonList(tabletAssignedToServer2);
- private final List<Entry<Key,Value>> emptyList = Collections.emptyList();
- private final Iterator<Entry<Key,Value>> emptyKV = emptyList.iterator();
+ private final Stream<TabletLocationState> tabletOnServer1List =
+ Stream.of(tabletAssignedToServer1);
+ private final Stream<TabletLocationState> tabletOnServer2List =
+ Stream.of(tabletAssignedToServer2);
+ private final Iterator<Entry<Key,Value>> emptyKV =
Collections.emptyIterator();
@Test
public void testRemoveUnusedLog() throws Exception {
@@ -109,8 +109,8 @@ public class GarbageCollectWriteAheadLogsTest {
marker.removeWalMarker(server1, id);
EasyMock.expectLastCall().once();
EasyMock.replay(context, fs, marker, tserverSet);
- GarbageCollectWriteAheadLogs gc = new
GarbageCollectWriteAheadLogs(context, fs, false,
- tserverSet, marker, tabletOnServer1List) {
+ var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false,
marker,
+ tabletOnServer1List) {
@Override
@Deprecated
protected int removeReplicationEntries(Map<UUID,TServerInstance>
candidates) {
@@ -142,8 +142,8 @@ public class GarbageCollectWriteAheadLogsTest {
EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
EasyMock.expect(marker.state(server1, id)).andReturn(new
Pair<>(WalState.CLOSED, path));
EasyMock.replay(context, marker, tserverSet, fs);
- GarbageCollectWriteAheadLogs gc = new
GarbageCollectWriteAheadLogs(context, fs, false,
- tserverSet, marker, tabletOnServer1List) {
+ var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false,
marker,
+ tabletOnServer1List) {
@Override
@Deprecated
protected int removeReplicationEntries(Map<UUID,TServerInstance>
candidates) {
@@ -160,7 +160,7 @@ public class GarbageCollectWriteAheadLogsTest {
}
@Test
- public void deleteUnreferenceLogOnDeadServer() throws Exception {
+ public void deleteUnreferencedLogOnDeadServer() throws Exception {
ServerContext context = EasyMock.createMock(ServerContext.class);
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -196,8 +196,8 @@ public class GarbageCollectWriteAheadLogsTest {
marker.forget(server2);
EasyMock.expectLastCall().once();
EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
- GarbageCollectWriteAheadLogs gc = new
GarbageCollectWriteAheadLogs(context, fs, false,
- tserverSet, marker, tabletOnServer1List) {
+ var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false,
marker,
+ tabletOnServer1List) {
@Override
protected Map<UUID,Path> getSortedWALogs() {
return Collections.emptyMap();
@@ -239,8 +239,8 @@ public class GarbageCollectWriteAheadLogsTest {
EasyMock.expectLastCall().once();
EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
- GarbageCollectWriteAheadLogs gc = new
GarbageCollectWriteAheadLogs(context, fs, false,
- tserverSet, marker, tabletOnServer2List) {
+ var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false,
marker,
+ tabletOnServer2List) {
@Override
protected Map<UUID,Path> getSortedWALogs() {
return Collections.emptyMap();
@@ -287,8 +287,8 @@ public class GarbageCollectWriteAheadLogsTest {
EasyMock.expectLastCall().once();
EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator());
EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
- GarbageCollectWriteAheadLogs gc = new
GarbageCollectWriteAheadLogs(context, fs, false,
- tserverSet, marker, tabletOnServer1List) {
+ var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false,
marker,
+ tabletOnServer1List) {
@Override
protected Map<UUID,Path> getSortedWALogs() {
return Collections.emptyMap();