This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 86318b14d6 init method to avoid leaking `this` reference in manager
constructor (#5732)
86318b14d6 is described below
commit 86318b14d649ab2e64ec2ffefdfcd8c7f531c8aa
Author: Imirie Billey <[email protected]>
AuthorDate: Tue Jul 22 19:16:02 2025 -0400
init method to avoid leaking `this` reference in manager constructor (#5732)
Created an init method that sets timeKeeper and tserverSet after manager
has been fully constructed so that an incomplete `this` referenced is not
passed through those constructors. This init method should be called after
every new Manager object has been created.
Closes #5673
---
.../accumulo/server/manager/LiveTServerSet.java | 29 ++++++++----
.../accumulo/server/util/FindOfflineTablets.java | 5 +-
.../server/util/ListOnlineOnDemandTablets.java | 5 +-
.../server/manager/LiveTServerSetTest.java | 22 ++-------
.../apache/accumulo/gc/SimpleGarbageCollector.java | 16 +++----
.../java/org/apache/accumulo/manager/Manager.java | 11 +++--
.../org/apache/accumulo/manager/ManagerTime.java | 55 ++++++++++++++++++----
.../apache/accumulo/manager/ManagerTimeTest.java | 7 +++
8 files changed, 99 insertions(+), 51 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index 79a8b1f601..8756b5ca46 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -27,10 +27,12 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
@@ -77,7 +79,7 @@ public class LiveTServerSet implements ZooCacheWatcher {
private static final Logger log =
LoggerFactory.getLogger(LiveTServerSet.class);
- private final Listener cback;
+ private final AtomicReference<Listener> cback;
private final ServerContext context;
public class TServerConnection {
@@ -213,15 +215,24 @@ public class LiveTServerSet implements ZooCacheWatcher {
// The set of entries in zookeeper without locks, and the first time each
was noticed
private final Map<ServiceLockPath,Long> locklessServers = new HashMap<>();
- public LiveTServerSet(ServerContext context, Listener cback) {
- this.cback = cback;
+ public LiveTServerSet(ServerContext context) {
+ this.cback = new AtomicReference<>(null);
this.context = context;
- this.context.getZooCache().addZooCacheWatcher(this);
}
- public synchronized void startListeningForTabletServerChanges() {
- scanServers();
+ private Listener getCback() {
+ // fail fast if not yet set
+ return Objects.requireNonNull(cback.get());
+ }
+ public synchronized void startListeningForTabletServerChanges(Listener
cback) {
+ scanServers();
+ Objects.requireNonNull(cback);
+ if (this.cback.compareAndSet(null, cback)) {
+ this.context.getZooCache().addZooCacheWatcher(this);
+ } else if (this.cback.get() != cback) {
+ throw new IllegalStateException("Attempted to set different cback
object");
+ }
ThreadPools.watchCriticalScheduledTask(this.context.getScheduledExecutor()
.scheduleWithFixedDelay(this::scanServers, 5000, 5000,
TimeUnit.MILLISECONDS));
}
@@ -252,7 +263,7 @@ public class LiveTServerSet implements ZooCacheWatcher {
checkServer(updates, doomed, tserverPath);
}
- this.cback.update(this, doomed, updates);
+ this.getCback().update(this, doomed, updates);
} catch (Exception ex) {
log.error("{}", ex.getMessage(), ex);
}
@@ -365,7 +376,7 @@ public class LiveTServerSet implements ZooCacheWatcher {
final Set<TServerInstance> updates = new HashSet<>();
final Set<TServerInstance> doomed = new HashSet<>();
checkServer(updates, doomed, slp);
- this.cback.update(this, doomed, updates);
+ this.getCback().update(this, doomed, updates);
} catch (Exception ex) {
log.error("Error processing event for tserver: " +
slp.toString(), ex);
}
@@ -467,7 +478,7 @@ public class LiveTServerSet implements ZooCacheWatcher {
return find(current, tabletServer);
}
- TServerInstance find(Map<String,TServerInfo> servers, String tabletServer) {
+ static TServerInstance find(Map<String,TServerInfo> servers, String
tabletServer) {
HostAndPort addr;
String sessionId = null;
if (tabletServer.charAt(tabletServer.length() - 1) == ']') {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
index 6720133dca..85b400b349 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
@@ -64,7 +64,9 @@ public class FindOfflineTablets {
final AtomicBoolean scanning = new AtomicBoolean(false);
- LiveTServerSet tservers = new LiveTServerSet(context, new Listener() {
+ LiveTServerSet tservers = new LiveTServerSet(context);
+
+ tservers.startListeningForTabletServerChanges(new Listener() {
@Override
public void update(LiveTServerSet current, Set<TServerInstance> deleted,
Set<TServerInstance> added) {
@@ -76,7 +78,6 @@ public class FindOfflineTablets {
}
}
});
- tservers.startListeningForTabletServerChanges();
scanning.set(true);
int offline = 0;
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java
index fd002d64f6..c0f061ad74 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java
@@ -60,7 +60,9 @@ public class ListOnlineOnDemandTablets {
final AtomicBoolean scanning = new AtomicBoolean(false);
- LiveTServerSet tservers = new LiveTServerSet(context, new Listener() {
+ LiveTServerSet tservers = new LiveTServerSet(context);
+
+ tservers.startListeningForTabletServerChanges(new Listener() {
@Override
public void update(LiveTServerSet current, Set<TServerInstance> deleted,
Set<TServerInstance> added) {
@@ -72,7 +74,6 @@ public class ListOnlineOnDemandTablets {
}
}
});
- tservers.startListeningForTabletServerChanges();
scanning.set(true);
System.out.println("Scanning " + SystemTables.METADATA.tableName());
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/manager/LiveTServerSetTest.java
b/server/base/src/test/java/org/apache/accumulo/server/manager/LiveTServerSetTest.java
index 64a234f31f..15e9d5e3ac 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/manager/LiveTServerSetTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/manager/LiveTServerSetTest.java
@@ -26,9 +26,6 @@ import java.util.Map;
import org.apache.accumulo.core.data.ResourceGroupId;
import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.zookeeper.ZooCache;
-import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.manager.LiveTServerSet.Listener;
import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.manager.LiveTServerSet.TServerInfo;
import org.easymock.EasyMock;
@@ -48,21 +45,10 @@ public class LiveTServerSetTest {
mockConn, ResourceGroupId.DEFAULT);
servers.put("server1", server1);
- ServerContext ctx = EasyMock.createMock(ServerContext.class);
- ZooCache zc = EasyMock.createMock(ZooCache.class);
- EasyMock.expect(ctx.getZooCache()).andReturn(zc);
- zc.addZooCacheWatcher(EasyMock.isA(LiveTServerSet.class));
- EasyMock.replay(ctx, zc);
-
- LiveTServerSet tservers = new LiveTServerSet(ctx,
EasyMock.createMock(Listener.class));
-
- assertEquals(server1.instance, tservers.find(servers, "localhost:1234"));
- assertNull(tservers.find(servers, "localhost:4321"));
- assertEquals(server1.instance, tservers.find(servers,
"localhost:1234[5555]"));
- assertNull(tservers.find(servers, "localhost:1234[55755]"));
-
- EasyMock.verify(ctx, zc);
-
+ assertEquals(server1.instance, LiveTServerSet.find(servers,
"localhost:1234"));
+ assertNull(LiveTServerSet.find(servers, "localhost:4321"));
+ assertEquals(server1.instance, LiveTServerSet.find(servers,
"localhost:1234[5555]"));
+ assertNull(LiveTServerSet.find(servers, "localhost:1234[55755]"));
}
}
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 96c23a2d59..17888d334f 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -198,15 +198,15 @@ public class SimpleGarbageCollector extends
AbstractServer implements Iface {
// This is created outside of the run loop and passed to the
walogCollector so that
// only a single timed task is created (internal to LiveTServerSet) using
SimpleTimer.
- final LiveTServerSet liveTServerSet =
- new LiveTServerSet(getContext(), (current, deleted, added) -> {
- log.debug("Number of current servers {}, tservers added {}, removed
{}",
- current == null ? -1 : current.size(), added, deleted);
+ final LiveTServerSet liveTServerSet = new LiveTServerSet(getContext());
+ liveTServerSet.startListeningForTabletServerChanges((current, deleted,
added) -> {
+ log.debug("Number of current servers {}, tservers added {}, removed {}",
+ current == null ? -1 : current.size(), added, deleted);
- if (log.isTraceEnabled()) {
- log.trace("Current servers: {}\nAdded: {}\n Removed: {}", current,
added, deleted);
- }
- });
+ if (log.isTraceEnabled()) {
+ log.trace("Current servers: {}\nAdded: {}\n Removed: {}", current,
added, deleted);
+ }
+ });
while (!isShutdownRequested()) {
if (Thread.currentThread().isInterrupted()) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index bebdc12237..d72295c3f2 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -500,8 +500,8 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener {
log.info("Version {}", Constants.VERSION);
log.info("Instance {}", context.getInstanceID());
- timeKeeper = new ManagerTime(this, aconf);
- tserverSet = new LiveTServerSet(context, this);
+ timeKeeper = new ManagerTime();
+ tserverSet = new LiveTServerSet(context);
final long tokenLifetime =
aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME);
@@ -910,6 +910,11 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener {
final ServerContext context = getContext();
balanceManager.setManager(this);
+ try {
+ timeKeeper.setManager(this);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
// ACCUMULO-4424 Put up the Thrift servers before getting the lock as a
sign of process health
// when a hot-standby
@@ -966,7 +971,7 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener {
Thread statusThread = Threads.createCriticalThread("Status Thread", new
StatusThread());
statusThread.start();
- tserverSet.startListeningForTabletServerChanges();
+ tserverSet.startListeningForTabletServerChanges(this);
try {
blockForTservers();
} catch (InterruptedException ex) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
index dcf2b33d17..2dea80851f 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
@@ -24,11 +24,11 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.IOException;
import java.time.Duration;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -42,12 +42,31 @@ import com.google.common.annotations.VisibleForTesting;
/**
* Keep a persistent roughly monotone view of how long a manager has been
overseeing this cluster.
+ * <p>
+ * Example of how ManagerTime is expected to work:
+ * <ul>
+ * <li>The first Manager process (MP1) starts in an accumulo cluster</li>
+ * <li>ManagerTime.getTime() is called in MP1 after it has been alive for 30
secs and returns 30
+ * secs</li>
+ * <li>ManagerTime.getTime() is called in MP1 after it has been alive for 90
secs and returns 90
+ * secs</li>
+ * <li>MP1 dies after being alive for 120 seconds</li>
+ * <li>After 300 seconds the second manager process starts</li>
+ * <li>ManagerTime.getTime() is called in MP2 after it has been alive for 15
secs and returns 120+15
+ * secs. The 120 was how long MP1 was alive.</li>
+ * <li>ManagerTime.getTime() is called in MP2 after it has been alive for 45
secs and returns 120+45
+ * secs.</li>
+ * <li>MP2 dies after being alive for 90 seconds.</li>
+ * <li>After 600 seconds the second manager process starts</li>
+ * <li>ManagerTime.getTime() is called in MP3 after it has been alive for 10
secs and returns
+ * 120+90+10 secs. The 120 and 90 are the sums of the previous manager
lifetimes.</li>
+ * </ul>
*/
public class ManagerTime {
private static final Logger log = LoggerFactory.getLogger(ManagerTime.class);
- private final ZooReaderWriter zk;
- private final Manager manager;
+ private ZooReaderWriter zk;
+ private final AtomicReference<Manager> manager;
/**
* Difference between time stored in ZooKeeper and System.nanoTime() when we
last read from
@@ -85,12 +104,30 @@ public class ManagerTime {
* start.</li>
* </ul>
*/
- private final AtomicReference<Duration> skewAmount;
+ private AtomicReference<Duration> skewAmount;
- public ManagerTime(Manager manager, AccumuloConfiguration conf) throws
IOException {
- this.zk = manager.getContext().getZooSession().asReaderWriter();
- this.manager = manager;
+ ManagerTime() {
+ this.manager = new AtomicReference<>(null);
+ }
+
+ // Once it's set call constructor
+ public void setManager(Manager manager) throws IOException {
+ Objects.requireNonNull(manager);
+ if (this.manager.compareAndSet(null, manager)) {
+ // I don't want this to throw IOException?
+ initializeManagerTime();
+ } else if (this.manager.get() != manager) {
+ throw new IllegalStateException("Attempted to set different manager
object");
+ }
+ }
+
+ private Manager getManager() {
+ // fail fast if not yet set
+ return Objects.requireNonNull(manager.get());
+ }
+ private void initializeManagerTime() throws IOException {
+ this.zk = getManager().getContext().getZooSession().asReaderWriter();
try {
zk.putPersistentData(Constants.ZMANAGER_TICK, "0".getBytes(UTF_8),
NodeExistsPolicy.SKIP);
skewAmount = new AtomicReference<>(updateSkew(getZkTime()));
@@ -98,7 +135,7 @@ public class ManagerTime {
throw new IOException("Error updating manager time", ex);
}
-
ThreadPools.watchCriticalScheduledTask(manager.getContext().getScheduledExecutor()
+
ThreadPools.watchCriticalScheduledTask(getManager().getContext().getScheduledExecutor()
.scheduleWithFixedDelay(Threads.createNamedRunnable("Manager time
keeper", this::run), 0,
SECONDS.toMillis(10), MILLISECONDS));
}
@@ -113,7 +150,7 @@ public class ManagerTime {
}
public void run() {
- switch (manager.getManagerState()) {
+ switch (getManager().getManagerState()) {
// If we don't have the lock, periodically re-read the value in
ZooKeeper, in case there's
// another manager we're
// shadowing for.
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/ManagerTimeTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/ManagerTimeTest.java
index 528eccca6d..80b9c31c58 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/ManagerTimeTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/ManagerTimeTest.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
@@ -127,4 +128,10 @@ public class ManagerTimeTest {
assertTrue(skew.toNanos() - updatedSkew.toNanos() > 0);
assertTrue(skew.compareTo(updatedSkew) > 0);
}
+
+ @Test
+ public void testManagerIsNotSet() {
+ var mt = new ManagerTime();
+ assertThrows(NullPointerException.class, mt::getTime);
+ }
}