This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 86318b14d6 init method to avoid leaking `this` reference in manager 
constructor (#5732)
86318b14d6 is described below

commit 86318b14d649ab2e64ec2ffefdfcd8c7f531c8aa
Author: Imirie Billey <[email protected]>
AuthorDate: Tue Jul 22 19:16:02 2025 -0400

    init method to avoid leaking `this` reference in manager constructor (#5732)
    
    Created an init method that sets timeKeeper and tserverSet after manager 
has been fully constructed so that an incomplete `this` referenced is not 
passed through those constructors. This init method should be called after 
every new Manager object has been created.
    
    Closes #5673
---
 .../accumulo/server/manager/LiveTServerSet.java    | 29 ++++++++----
 .../accumulo/server/util/FindOfflineTablets.java   |  5 +-
 .../server/util/ListOnlineOnDemandTablets.java     |  5 +-
 .../server/manager/LiveTServerSetTest.java         | 22 ++-------
 .../apache/accumulo/gc/SimpleGarbageCollector.java | 16 +++----
 .../java/org/apache/accumulo/manager/Manager.java  | 11 +++--
 .../org/apache/accumulo/manager/ManagerTime.java   | 55 ++++++++++++++++++----
 .../apache/accumulo/manager/ManagerTimeTest.java   |  7 +++
 8 files changed, 99 insertions(+), 51 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index 79a8b1f601..8756b5ca46 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -27,10 +27,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
@@ -77,7 +79,7 @@ public class LiveTServerSet implements ZooCacheWatcher {
 
   private static final Logger log = 
LoggerFactory.getLogger(LiveTServerSet.class);
 
-  private final Listener cback;
+  private final AtomicReference<Listener> cback;
   private final ServerContext context;
 
   public class TServerConnection {
@@ -213,15 +215,24 @@ public class LiveTServerSet implements ZooCacheWatcher {
   // The set of entries in zookeeper without locks, and the first time each 
was noticed
   private final Map<ServiceLockPath,Long> locklessServers = new HashMap<>();
 
-  public LiveTServerSet(ServerContext context, Listener cback) {
-    this.cback = cback;
+  public LiveTServerSet(ServerContext context) {
+    this.cback = new AtomicReference<>(null);
     this.context = context;
-    this.context.getZooCache().addZooCacheWatcher(this);
   }
 
-  public synchronized void startListeningForTabletServerChanges() {
-    scanServers();
+  private Listener getCback() {
+    // fail fast if not yet set
+    return Objects.requireNonNull(cback.get());
+  }
 
+  public synchronized void startListeningForTabletServerChanges(Listener 
cback) {
+    scanServers();
+    Objects.requireNonNull(cback);
+    if (this.cback.compareAndSet(null, cback)) {
+      this.context.getZooCache().addZooCacheWatcher(this);
+    } else if (this.cback.get() != cback) {
+      throw new IllegalStateException("Attempted to set different cback 
object");
+    }
     ThreadPools.watchCriticalScheduledTask(this.context.getScheduledExecutor()
         .scheduleWithFixedDelay(this::scanServers, 5000, 5000, 
TimeUnit.MILLISECONDS));
   }
@@ -252,7 +263,7 @@ public class LiveTServerSet implements ZooCacheWatcher {
         checkServer(updates, doomed, tserverPath);
       }
 
-      this.cback.update(this, doomed, updates);
+      this.getCback().update(this, doomed, updates);
     } catch (Exception ex) {
       log.error("{}", ex.getMessage(), ex);
     }
@@ -365,7 +376,7 @@ public class LiveTServerSet implements ZooCacheWatcher {
                 final Set<TServerInstance> updates = new HashSet<>();
                 final Set<TServerInstance> doomed = new HashSet<>();
                 checkServer(updates, doomed, slp);
-                this.cback.update(this, doomed, updates);
+                this.getCback().update(this, doomed, updates);
               } catch (Exception ex) {
                 log.error("Error processing event for tserver: " + 
slp.toString(), ex);
               }
@@ -467,7 +478,7 @@ public class LiveTServerSet implements ZooCacheWatcher {
     return find(current, tabletServer);
   }
 
-  TServerInstance find(Map<String,TServerInfo> servers, String tabletServer) {
+  static TServerInstance find(Map<String,TServerInfo> servers, String 
tabletServer) {
     HostAndPort addr;
     String sessionId = null;
     if (tabletServer.charAt(tabletServer.length() - 1) == ']') {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
index 6720133dca..85b400b349 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
@@ -64,7 +64,9 @@ public class FindOfflineTablets {
 
     final AtomicBoolean scanning = new AtomicBoolean(false);
 
-    LiveTServerSet tservers = new LiveTServerSet(context, new Listener() {
+    LiveTServerSet tservers = new LiveTServerSet(context);
+
+    tservers.startListeningForTabletServerChanges(new Listener() {
       @Override
       public void update(LiveTServerSet current, Set<TServerInstance> deleted,
           Set<TServerInstance> added) {
@@ -76,7 +78,6 @@ public class FindOfflineTablets {
         }
       }
     });
-    tservers.startListeningForTabletServerChanges();
     scanning.set(true);
 
     int offline = 0;
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java
index fd002d64f6..c0f061ad74 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java
@@ -60,7 +60,9 @@ public class ListOnlineOnDemandTablets {
 
     final AtomicBoolean scanning = new AtomicBoolean(false);
 
-    LiveTServerSet tservers = new LiveTServerSet(context, new Listener() {
+    LiveTServerSet tservers = new LiveTServerSet(context);
+
+    tservers.startListeningForTabletServerChanges(new Listener() {
       @Override
       public void update(LiveTServerSet current, Set<TServerInstance> deleted,
           Set<TServerInstance> added) {
@@ -72,7 +74,6 @@ public class ListOnlineOnDemandTablets {
         }
       }
     });
-    tservers.startListeningForTabletServerChanges();
     scanning.set(true);
 
     System.out.println("Scanning " + SystemTables.METADATA.tableName());
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/manager/LiveTServerSetTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/manager/LiveTServerSetTest.java
index 64a234f31f..15e9d5e3ac 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/manager/LiveTServerSetTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/manager/LiveTServerSetTest.java
@@ -26,9 +26,6 @@ import java.util.Map;
 
 import org.apache.accumulo.core.data.ResourceGroupId;
 import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.zookeeper.ZooCache;
-import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.manager.LiveTServerSet.Listener;
 import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.manager.LiveTServerSet.TServerInfo;
 import org.easymock.EasyMock;
@@ -48,21 +45,10 @@ public class LiveTServerSetTest {
             mockConn, ResourceGroupId.DEFAULT);
     servers.put("server1", server1);
 
-    ServerContext ctx = EasyMock.createMock(ServerContext.class);
-    ZooCache zc = EasyMock.createMock(ZooCache.class);
-    EasyMock.expect(ctx.getZooCache()).andReturn(zc);
-    zc.addZooCacheWatcher(EasyMock.isA(LiveTServerSet.class));
-    EasyMock.replay(ctx, zc);
-
-    LiveTServerSet tservers = new LiveTServerSet(ctx, 
EasyMock.createMock(Listener.class));
-
-    assertEquals(server1.instance, tservers.find(servers, "localhost:1234"));
-    assertNull(tservers.find(servers, "localhost:4321"));
-    assertEquals(server1.instance, tservers.find(servers, 
"localhost:1234[5555]"));
-    assertNull(tservers.find(servers, "localhost:1234[55755]"));
-
-    EasyMock.verify(ctx, zc);
-
+    assertEquals(server1.instance, LiveTServerSet.find(servers, 
"localhost:1234"));
+    assertNull(LiveTServerSet.find(servers, "localhost:4321"));
+    assertEquals(server1.instance, LiveTServerSet.find(servers, 
"localhost:1234[5555]"));
+    assertNull(LiveTServerSet.find(servers, "localhost:1234[55755]"));
   }
 
 }
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 96c23a2d59..17888d334f 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -198,15 +198,15 @@ public class SimpleGarbageCollector extends 
AbstractServer implements Iface {
 
     // This is created outside of the run loop and passed to the 
walogCollector so that
     // only a single timed task is created (internal to LiveTServerSet) using 
SimpleTimer.
-    final LiveTServerSet liveTServerSet =
-        new LiveTServerSet(getContext(), (current, deleted, added) -> {
-          log.debug("Number of current servers {}, tservers added {}, removed 
{}",
-              current == null ? -1 : current.size(), added, deleted);
+    final LiveTServerSet liveTServerSet = new LiveTServerSet(getContext());
+    liveTServerSet.startListeningForTabletServerChanges((current, deleted, 
added) -> {
+      log.debug("Number of current servers {}, tservers added {}, removed {}",
+          current == null ? -1 : current.size(), added, deleted);
 
-          if (log.isTraceEnabled()) {
-            log.trace("Current servers: {}\nAdded: {}\n Removed: {}", current, 
added, deleted);
-          }
-        });
+      if (log.isTraceEnabled()) {
+        log.trace("Current servers: {}\nAdded: {}\n Removed: {}", current, 
added, deleted);
+      }
+    });
 
     while (!isShutdownRequested()) {
       if (Thread.currentThread().isInterrupted()) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index bebdc12237..d72295c3f2 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -500,8 +500,8 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
 
     log.info("Version {}", Constants.VERSION);
     log.info("Instance {}", context.getInstanceID());
-    timeKeeper = new ManagerTime(this, aconf);
-    tserverSet = new LiveTServerSet(context, this);
+    timeKeeper = new ManagerTime();
+    tserverSet = new LiveTServerSet(context);
 
     final long tokenLifetime = 
aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME);
 
@@ -910,6 +910,11 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
     final ServerContext context = getContext();
 
     balanceManager.setManager(this);
+    try {
+      timeKeeper.setManager(this);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
 
     // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a 
sign of process health
     // when a hot-standby
@@ -966,7 +971,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
     Thread statusThread = Threads.createCriticalThread("Status Thread", new 
StatusThread());
     statusThread.start();
 
-    tserverSet.startListeningForTabletServerChanges();
+    tserverSet.startListeningForTabletServerChanges(this);
     try {
       blockForTservers();
     } catch (InterruptedException ex) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
index dcf2b33d17..2dea80851f 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
@@ -24,11 +24,11 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -42,12 +42,31 @@ import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Keep a persistent roughly monotone view of how long a manager has been 
overseeing this cluster.
+ * <p>
+ * Example of how ManagerTime is expected to work:
+ * <ul>
+ * <li>The first Manager process (MP1) starts in an accumulo cluster</li>
+ * <li>ManagerTime.getTime() is called in MP1 after it has been alive for 30 
secs and returns 30
+ * secs</li>
+ * <li>ManagerTime.getTime() is called in MP1 after it has been alive for 90 
secs and returns 90
+ * secs</li>
+ * <li>MP1 dies after being alive for 120 seconds</li>
+ * <li>After 300 seconds the second manager process starts</li>
+ * <li>ManagerTime.getTime() is called in MP2 after it has been alive for 15 
secs and returns 120+15
+ * secs. The 120 was how long MP1 was alive.</li>
+ * <li>ManagerTime.getTime() is called in MP2 after it has been alive for 45 
secs and returns 120+45
+ * secs.</li>
+ * <li>MP2 dies after being alive for 90 seconds.</li>
+ * <li>After 600 seconds the second manager process starts</li>
+ * <li>ManagerTime.getTime() is called in MP3 after it has been alive for 10 
secs and returns
+ * 120+90+10 secs. The 120 and 90 are the sums of the previous manager 
lifetimes.</li>
+ * </ul>
  */
 public class ManagerTime {
   private static final Logger log = LoggerFactory.getLogger(ManagerTime.class);
 
-  private final ZooReaderWriter zk;
-  private final Manager manager;
+  private ZooReaderWriter zk;
+  private final AtomicReference<Manager> manager;
 
   /**
    * Difference between time stored in ZooKeeper and System.nanoTime() when we 
last read from
@@ -85,12 +104,30 @@ public class ManagerTime {
    * start.</li>
    * </ul>
    */
-  private final AtomicReference<Duration> skewAmount;
+  private AtomicReference<Duration> skewAmount;
 
-  public ManagerTime(Manager manager, AccumuloConfiguration conf) throws 
IOException {
-    this.zk = manager.getContext().getZooSession().asReaderWriter();
-    this.manager = manager;
+  ManagerTime() {
+    this.manager = new AtomicReference<>(null);
+  }
+
+  // Once it's set call constructor
+  public void setManager(Manager manager) throws IOException {
+    Objects.requireNonNull(manager);
+    if (this.manager.compareAndSet(null, manager)) {
+      // I don't want this to throw IOException?
+      initializeManagerTime();
+    } else if (this.manager.get() != manager) {
+      throw new IllegalStateException("Attempted to set different manager 
object");
+    }
+  }
+
+  private Manager getManager() {
+    // fail fast if not yet set
+    return Objects.requireNonNull(manager.get());
+  }
 
+  private void initializeManagerTime() throws IOException {
+    this.zk = getManager().getContext().getZooSession().asReaderWriter();
     try {
       zk.putPersistentData(Constants.ZMANAGER_TICK, "0".getBytes(UTF_8), 
NodeExistsPolicy.SKIP);
       skewAmount = new AtomicReference<>(updateSkew(getZkTime()));
@@ -98,7 +135,7 @@ public class ManagerTime {
       throw new IOException("Error updating manager time", ex);
     }
 
-    
ThreadPools.watchCriticalScheduledTask(manager.getContext().getScheduledExecutor()
+    
ThreadPools.watchCriticalScheduledTask(getManager().getContext().getScheduledExecutor()
         .scheduleWithFixedDelay(Threads.createNamedRunnable("Manager time 
keeper", this::run), 0,
             SECONDS.toMillis(10), MILLISECONDS));
   }
@@ -113,7 +150,7 @@ public class ManagerTime {
   }
 
   public void run() {
-    switch (manager.getManagerState()) {
+    switch (getManager().getManagerState()) {
       // If we don't have the lock, periodically re-read the value in 
ZooKeeper, in case there's
       // another manager we're
       // shadowing for.
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/ManagerTimeTest.java 
b/server/manager/src/test/java/org/apache/accumulo/manager/ManagerTimeTest.java
index 528eccca6d..80b9c31c58 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/ManagerTimeTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/ManagerTimeTest.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.List;
@@ -127,4 +128,10 @@ public class ManagerTimeTest {
     assertTrue(skew.toNanos() - updatedSkew.toNanos() > 0);
     assertTrue(skew.compareTo(updatedSkew) > 0);
   }
+
+  @Test
+  public void testManagerIsNotSet() {
+    var mt = new ManagerTime();
+    assertThrows(NullPointerException.class, mt::getTime);
+  }
 }

Reply via email to