dsmiley commented on code in PR #760:
URL: https://github.com/apache/solr/pull/760#discussion_r1520372343


##########
solr/core/src/java/org/apache/solr/cloud/ZkController.java:
##########
@@ -1187,25 +1178,19 @@ private void createEphemeralLiveNode() throws 
KeeperException, InterruptedExcept
     String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
     log.info("Register node as live in ZooKeeper:{}", nodePath);
     Map<NodeRoles.Role, String> roles = cc.nodeRoles.getRoles();
-    List<Op> ops = new ArrayList<>(roles.size() + 1);
-    ops.add(
-        Op.create(
-            nodePath,
-            null,
-            zkClient.getZkACLProvider().getACLsToAdd(nodePath),

Review Comment:
   I'm wondering where ACLs are handled



##########
solr/core/src/java/org/apache/solr/cloud/ZkController.java:
##########
@@ -366,7 +361,6 @@ public ZkController(
             .withUrl(zkServerAddress)
             .withTimeout(clientTimeout, TimeUnit.MILLISECONDS)
             .withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS)
-            .withConnStrategy(strat)

Review Comment:
   What happened with this?



##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java:
##########
@@ -1048,6 +1125,11 @@ public void downloadFromZK(String zkPath, Path dir) 
throws IOException {
     ZkMaintenanceUtils.downloadFromZK(this, zkPath, dir);
   }
 
+  @FunctionalInterface
+  public interface IsClosed {

Review Comment:
   Does this new interface need to be public?



##########
solr/test-framework/build.gradle:
##########
@@ -43,6 +43,17 @@ dependencies {
   var zkExcludes = {
     exclude group: "org.apache.yetus", module: "audience-annotations"
   }
+  api('org.apache.curator:curator-client', {

Review Comment:
   Debatable if "api" is appropriate here



##########
solr/core/build.gradle:
##########
@@ -121,6 +121,16 @@ dependencies {
   implementation 'org.eclipse.jetty:jetty-io'
   implementation 'org.eclipse.jetty.toolchain:jetty-servlet-api'
 
+  implementation('org.apache.curator:curator-framework', {

Review Comment:
   Rather minor but I think moving this below so that we have a "ZooKeeper / 
Curator" section is more organized.



##########
solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java:
##########
@@ -398,16 +397,6 @@ private void retryRegisterWatcher() {
         return;
       } catch (KeeperException e) {
         log.warn("Failed watching shard term for collection: {}, retrying!", 
collection, e);
-        try {

Review Comment:
   why remove?



##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java:
##########
@@ -140,87 +145,92 @@ private SolrZkClient(
       return;
     }
     this.zkServerAddress = zkServerAddress;
-    this.higherLevelIsClosed = higherLevelIsClosed;
-    if (strat == null) {
-      String connectionStrategy = 
System.getProperty("solr.zookeeper.connectionStrategy");
-      strat =
-          ZkClientConnectionStrategy.forName(connectionStrategy, new 
DefaultConnectionStrategy());
+    String chroot, zkHost;
+    int chrootIndex = zkServerAddress.indexOf('/');
+    if (chrootIndex == -1) {
+      zkHost = zkServerAddress;
+      chroot = null;
+    } else if (chrootIndex == zkServerAddress.length() - 1) {
+      zkHost = zkServerAddress.substring(0, zkServerAddress.length() - 1);
+      chroot = null;
+    } else {
+      zkHost = zkServerAddress.substring(0, chrootIndex);
+      chroot = zkServerAddress.substring(chrootIndex + 1);
     }
-    this.zkClientConnectionStrategy = strat;
 
+    this.higherLevelIsClosed = higherLevelIsClosed;
     this.solrClassLoader = solrClassLoader;
-    if (!strat.hasZkCredentialsToAddAutomatically()) {
+    if (zkCredentialsProvider == null) {
       zkCredentialsInjector =
           useDefaultCredsAndACLs
               ? createZkCredentialsInjector()
               : new DefaultZkCredentialsInjector();
-      ZkCredentialsProvider zkCredentialsToAddAutomatically =
+      zkCredentialsProvider =
           useDefaultCredsAndACLs
               ? createZkCredentialsToAddAutomatically()
               : new DefaultZkCredentialsProvider();
-      
strat.setZkCredentialsToAddAutomatically(zkCredentialsToAddAutomatically);
+    }
+    if (aclProvider == null) {
+      aclProvider = createACLProvider();
+    }
+    if (chroot != null && aclProvider instanceof SecurityAwareZkACLProvider) {
+      this.aclProvider = ((SecurityAwareZkACLProvider) 
aclProvider).withChroot(chroot);
+    } else {
+      this.aclProvider = aclProvider;
     }
 
     this.zkClientTimeout = zkClientTimeout;
-    // we must retry at least as long as the session timeout
-    zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout, 
SolrZkClient.this::isClosed);
-    connManager =
-        new ConnectionManager(
-            "ZooKeeperConnection Watcher:" + zkServerAddress,
-            this,
-            zkServerAddress,
-            strat,
-            onReconnect,
-            beforeReconnect,
-            SolrZkClient.this::isClosed);
 
-    try {
-      strat.connect(
-          zkServerAddress,
-          zkClientTimeout,
-          wrapWatcher(connManager),
-          zooKeeper -> {
-            ZooKeeper oldKeeper = keeper;
-            keeper = zooKeeper;
-            try {
-              closeKeeper(oldKeeper);
-            } finally {
-              if (isClosed) {
-                // we may have been closed
-                closeKeeper(SolrZkClient.this.keeper);
-              }
-            }
-          });
-    } catch (Exception e) {
-      connManager.close();
-      if (keeper != null) {
-        try {
-          keeper.close();
-        } catch (InterruptedException e1) {
-          Thread.currentThread().interrupt();
-        }
-      }
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    curatorSafeServiceExecutor =
+        ExecutorUtil.newMDCAwareSingleThreadExecutor(
+            new SolrNamedThreadFactory("curator-safeService"));
+
+    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+    var clientBuilder =
+        CuratorFrameworkFactory.builder()
+            .ensembleProvider(new FixedEnsembleProvider(zkHost))
+            .namespace(chroot)
+            .sessionTimeoutMs(zkClientTimeout)
+            .connectionTimeoutMs(clientConnectTimeout)
+            .aclProvider(this.aclProvider)
+            .authorization(zkCredentialsProvider.getCredentials())
+            .retryPolicy(retryPolicy)
+            .runSafeService(curatorSafeServiceExecutor);
+
+    client = clientBuilder.build();

Review Comment:
   then why declare clientBuilder; just tack on a build() and only need the 
client var



##########
solr/modules/hadoop-auth/src/java/org/apache/solr/security/hadoop/DelegationTokenKerberosFilter.java:
##########


Review Comment:
   nice changes here to use Solr's Curator



##########
solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java:
##########
@@ -246,24 +246,19 @@ public byte[] remove() throws NoSuchElementException, 
KeeperException, Interrupt
 
   public void remove(Collection<String> paths) throws KeeperException, 
InterruptedException {
     if (paths.isEmpty()) return;
-    List<Op> ops = new ArrayList<>();
+    List<SolrZkClient.CuratorOpBuilder> ops = new ArrayList<>();
     for (String path : paths) {
-      ops.add(Op.delete(dir + "/" + path, -1));
+      ops.add(op -> op.delete().withVersion(-1).forPath(dir + "/" + path));
     }
     for (int from = 0; from < ops.size(); from += 1000) {
       int to = Math.min(from + 1000, ops.size());
       if (from < to) {
-        try {
-          zookeeper.multi(ops.subList(from, to), true);
-        } catch (KeeperException.NoNodeException e) {
-          // don't know which nodes are not exist, so try to delete one by one 
node
-          for (int j = from; j < to; j++) {
+        Collection<CuratorTransactionResult> results = 
zookeeper.multi(ops.subList(from, to));
+        for (CuratorTransactionResult result : results) {
+          if (result.getError() != 0) {
             try {
-              zookeeper.delete(ops.get(j).getPath(), -1, true);
-            } catch (KeeperException.NoNodeException e2) {
-              if (log.isDebugEnabled()) {
-                log.debug("Can not remove node which is not exist : {}", 
ops.get(j).getPath());

Review Comment:
   Did you remove because you saw too much of these?



##########
solr/test-framework/build.gradle:
##########
@@ -43,6 +43,17 @@ dependencies {
   var zkExcludes = {
     exclude group: "org.apache.yetus", module: "audience-annotations"
   }
+  api('org.apache.curator:curator-client', {
+    exclude group: 'org.apache.zookeeper', module: 'zookeeper'
+  })
+  api('org.apache.curator:curator-framework', {
+    exclude group: 'org.apache.zookeeper', module: 'zookeeper'
+  })
+  api('org.apache.curator:curator-test') {

Review Comment:
   Thus all solr-test-framework users (plugin users outside of the project) 
will need curator-test.  Ah; I see this is because ChaosMonkey is here and uses 
the KillSession thing.  Okay; a little comment might be nice.



##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java:
##########
@@ -993,14 +1051,33 @@ public void updateACLs(final String root) throws 
KeeperException, InterruptedExc
         ZkMaintenanceUtils.VISIT_ORDER.VISIT_POST,
         path -> {
           try {
-            setACL(path, getZkACLProvider().getACLsToAdd(path), true);
-            log.debug("Updated ACL on {}", path);
+            runWithCorrectThrows(
+                "updating acls", () -> 
client.setACL().withACL(null).forPath(path));
           } catch (NoNodeException ignored) {
             // If a node was deleted, don't bother trying to set ACLs on it.
           }
         });
   }
 
+  @FunctionalInterface
+  protected interface SupplierWithException<T> {
+    T get() throws Exception;
+  }
+
+  protected <T> T runWithCorrectThrows(String action, SupplierWithException<T> 
func)
+      throws KeeperException, InterruptedException {

Review Comment:
   but we catch InterruptedException now



##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java:
##########
@@ -140,87 +145,92 @@ private SolrZkClient(
       return;
     }
     this.zkServerAddress = zkServerAddress;
-    this.higherLevelIsClosed = higherLevelIsClosed;
-    if (strat == null) {
-      String connectionStrategy = 
System.getProperty("solr.zookeeper.connectionStrategy");
-      strat =
-          ZkClientConnectionStrategy.forName(connectionStrategy, new 
DefaultConnectionStrategy());
+    String chroot, zkHost;
+    int chrootIndex = zkServerAddress.indexOf('/');
+    if (chrootIndex == -1) {
+      zkHost = zkServerAddress;
+      chroot = null;
+    } else if (chrootIndex == zkServerAddress.length() - 1) {
+      zkHost = zkServerAddress.substring(0, zkServerAddress.length() - 1);
+      chroot = null;
+    } else {
+      zkHost = zkServerAddress.substring(0, chrootIndex);
+      chroot = zkServerAddress.substring(chrootIndex + 1);
     }
-    this.zkClientConnectionStrategy = strat;
 
+    this.higherLevelIsClosed = higherLevelIsClosed;
     this.solrClassLoader = solrClassLoader;
-    if (!strat.hasZkCredentialsToAddAutomatically()) {
+    if (zkCredentialsProvider == null) {
       zkCredentialsInjector =
           useDefaultCredsAndACLs
               ? createZkCredentialsInjector()
               : new DefaultZkCredentialsInjector();
-      ZkCredentialsProvider zkCredentialsToAddAutomatically =
+      zkCredentialsProvider =
           useDefaultCredsAndACLs
               ? createZkCredentialsToAddAutomatically()
               : new DefaultZkCredentialsProvider();
-      
strat.setZkCredentialsToAddAutomatically(zkCredentialsToAddAutomatically);
+    }
+    if (aclProvider == null) {
+      aclProvider = createACLProvider();
+    }
+    if (chroot != null && aclProvider instanceof SecurityAwareZkACLProvider) {
+      this.aclProvider = ((SecurityAwareZkACLProvider) 
aclProvider).withChroot(chroot);
+    } else {
+      this.aclProvider = aclProvider;
     }
 
     this.zkClientTimeout = zkClientTimeout;
-    // we must retry at least as long as the session timeout
-    zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout, 
SolrZkClient.this::isClosed);
-    connManager =
-        new ConnectionManager(
-            "ZooKeeperConnection Watcher:" + zkServerAddress,
-            this,
-            zkServerAddress,
-            strat,
-            onReconnect,
-            beforeReconnect,
-            SolrZkClient.this::isClosed);
 
-    try {
-      strat.connect(
-          zkServerAddress,
-          zkClientTimeout,
-          wrapWatcher(connManager),
-          zooKeeper -> {
-            ZooKeeper oldKeeper = keeper;
-            keeper = zooKeeper;
-            try {
-              closeKeeper(oldKeeper);
-            } finally {
-              if (isClosed) {
-                // we may have been closed
-                closeKeeper(SolrZkClient.this.keeper);
-              }
-            }
-          });
-    } catch (Exception e) {
-      connManager.close();
-      if (keeper != null) {
-        try {
-          keeper.close();
-        } catch (InterruptedException e1) {
-          Thread.currentThread().interrupt();
-        }
-      }
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    curatorSafeServiceExecutor =
+        ExecutorUtil.newMDCAwareSingleThreadExecutor(
+            new SolrNamedThreadFactory("curator-safeService"));
+
+    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+    var clientBuilder =
+        CuratorFrameworkFactory.builder()
+            .ensembleProvider(new FixedEnsembleProvider(zkHost))
+            .namespace(chroot)
+            .sessionTimeoutMs(zkClientTimeout)
+            .connectionTimeoutMs(clientConnectTimeout)
+            .aclProvider(this.aclProvider)
+            .authorization(zkCredentialsProvider.getCredentials())
+            .retryPolicy(retryPolicy)
+            .runSafeService(curatorSafeServiceExecutor);
+
+    client = clientBuilder.build();
+    if (onReconnect != null) {
+      client
+          .getConnectionStateListenable()
+          .addListener(onReconnect, zkConnectionListenerCallbackExecutor);
     }
-
+    if (beforeReconnect != null) {
+      client
+          .getConnectionStateListenable()
+          .addListener(beforeReconnect, zkConnectionListenerCallbackExecutor);
+    }
+    client.start();
     try {
-      connManager.waitForConnected(clientConnectTimeout);
+      if (!client.blockUntilConnected(clientConnectTimeout, 
TimeUnit.MILLISECONDS)) {
+        throw new TimeoutException(
+            String.format(
+                Locale.ROOT,
+                "Timeout while waiting for Zookeeper Client to connect: %d ms",
+                clientConnectTimeout));
+      }
+      ;

Review Comment:
   extra semicolon



##########
solr/solrj-zookeeper/build.gradle:
##########
@@ -32,6 +32,13 @@ dependencies {
 
     implementation project(':solr:solrj')
 
+    api('org.apache.curator:curator-client', {

Review Comment:
   Why "api"?  Not even ZK below is API level.  I view this dependency as more 
of an internal dependency to users.



##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -791,7 +792,14 @@ public void expireZkSession(JettySolrRunner jetty) {
     CoreContainer cores = jetty.getCoreContainer();
     if (cores != null) {
       ChaosMonkey.causeConnectionLoss(jetty);
-      
zkServer.expire(cores.getZkController().getZkClient().getZooKeeper().getSessionId());
+      SolrZkClient zkClient = cores.getZkController().getZkClient();
+      long sessionId = zkClient.getZkSessionId();
+      zkServer.expire(sessionId);
+      try {
+        
KillSession.kill(zkClient.getCuratorFramework().getZookeeperClient().getZooKeeper());

Review Comment:
   didn't you add a convenience method for that?  If I'm wrong; never mind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org
For additional commands, e-mail: issues-h...@solr.apache.org

Reply via email to