dsmiley commented on code in PR #3851:
URL: https://github.com/apache/solr/pull/3851#discussion_r2508365394


##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -117,7 +119,12 @@ public abstract class CloudSolrClient extends SolrClient {
           // UpdateParams.ROLLBACK
           );
 
-  protected volatile Object[] locks = objectList(3);
+  private final ConcurrentHashMap<String, CompletableFuture<DocCollection>> 
collectionRefreshes =
+      new ConcurrentHashMap<>();
+  private final Object stateRefreshExecutorLock = new Object();
+  private volatile int stateRefreshParallelism = 
DEFAULT_STATE_REFRESH_PARALLELISM;
+  private volatile ExecutorService stateRefreshExecutor;

Review Comment:
   I find it disappointing to see yet another ExecutorService (ThreadPool) for 
this use-case, so close to a a place where we can call 
HttpSolrClientBase.requestAsync, which uses an existing threadpool and 
non-blocking IO.  The parallelism can be controlled with a semaphore.



##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -117,7 +119,12 @@ public abstract class CloudSolrClient extends SolrClient {
           // UpdateParams.ROLLBACK
           );
 
-  protected volatile Object[] locks = objectList(3);
+  private final ConcurrentHashMap<String, CompletableFuture<DocCollection>> 
collectionRefreshes =

Review Comment:
   Instead of creating an additional cache, I think it would be better to 
integrate into StateCache, which is already collection-keyed, already has 
eviction of old stuff.



##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -902,16 +975,56 @@ protected NamedList<Object> requestWithRetryOnStaleState(
         }
       }
 
-      if (requestedCollections != null) {
-        requestedCollections.clear(); // done with this
-      }
-
       // if the state was stale, then we retry the request once with new state 
pulled from Zk
       if (stateWasStale) {
         log.warn(
             "Re-trying request to collection(s) {} after stale state error 
from server.",
             inputCollections);
-        resp = requestWithRetryOnStaleState(request, retryCount + 1, 
inputCollections);
+
+        Map<String, CompletableFuture<DocCollection>> refreshesToWaitFor = 
pendingRefreshes;
+        if (!waitedForRefresh && (pendingRefreshes == null || 
pendingRefreshes.isEmpty())) {
+          refreshesToWaitFor = new HashMap<>();
+          for (DocCollection ext : requestedCollections) {
+            refreshesToWaitFor.put(ext.getName(), 
triggerCollectionRefresh(ext.getName()));
+          }
+        }
+
+        // First retry without sending state versions to avoid needless waits 
when stale state is
+        // still usable.
+        if (!skipStateVersion && !waitedForRefresh) {
+          resp =
+              requestWithRetryOnStaleState(
+                  request,
+                  retryCount + 1,
+                  inputCollections,
+                  /*skipStateVersion*/ true,
+                  refreshesToWaitFor,
+                  waitedForRefresh);

Review Comment:
   I'd prefer like you do elsewhere: `/*waitedForRefresh*/ true`



##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -902,16 +975,56 @@ protected NamedList<Object> requestWithRetryOnStaleState(
         }
       }
 
-      if (requestedCollections != null) {
-        requestedCollections.clear(); // done with this
-      }
-
       // if the state was stale, then we retry the request once with new state 
pulled from Zk
       if (stateWasStale) {
         log.warn(
             "Re-trying request to collection(s) {} after stale state error 
from server.",
             inputCollections);
-        resp = requestWithRetryOnStaleState(request, retryCount + 1, 
inputCollections);
+
+        Map<String, CompletableFuture<DocCollection>> refreshesToWaitFor = 
pendingRefreshes;
+        if (!waitedForRefresh && (pendingRefreshes == null || 
pendingRefreshes.isEmpty())) {
+          refreshesToWaitFor = new HashMap<>();
+          for (DocCollection ext : requestedCollections) {
+            refreshesToWaitFor.put(ext.getName(), 
triggerCollectionRefresh(ext.getName()));
+          }
+        }
+
+        // First retry without sending state versions to avoid needless waits 
when stale state is

Review Comment:
   Could you elaborate why "sending state versions" (I assume `\_stateVer\_`) 
is a problem?



##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -195,14 +206,39 @@ boolean shouldRetry() {
     void setRetriedAt() {
       retriedAtNano = System.nanoTime();
     }
+
+    boolean markMaybeStaleIfOutsideBackoff(long retryBackoffNano) {

Review Comment:
   a bit of javadocs would be helpful.  Especially to document the return 
meaning.



##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -1135,51 +1252,129 @@ public boolean isDirectUpdatesToLeadersOnly() {
     return directUpdatesToLeadersOnly;
   }
 
-  protected static Object[] objectList(int n) {
-    Object[] l = new Object[n];
-    for (int i = 0; i < n; i++) {
-      l[i] = new Object();
+  protected DocCollection getDocCollection(String collection, Integer 
expectedVersion)
+      throws SolrException {
+    if (expectedVersion == null) {
+      expectedVersion = -1;
     }
-    return l;
+    if (collection == null) {
+      return null;
+    }
+
+    ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(collection);
+    if (cacheEntry != null && 
cacheEntry.isExpired(collectionStateCache.timeToLiveMs)) {
+      collectionStateCache.remove(collection, cacheEntry);
+      cacheEntry = null;
+    }
+
+    DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
+
+    if (cacheEntry != null && cacheEntry.shouldRetry()) {
+      triggerCollectionRefresh(collection);
+    }
+
+    if (cached != null && expectedVersion <= cached.getZNodeVersion()) {
+      return cached;
+    }
+
+    CompletableFuture<DocCollection> refreshFuture = 
triggerCollectionRefresh(collection);
+    return waitForCollectionRefresh(collection, refreshFuture);
   }
 
-  protected DocCollection getDocCollection(String collection, Integer 
expectedVersion)
-      throws SolrException {
-    if (expectedVersion == null) expectedVersion = -1;
-    if (collection == null) return null;
-    ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.get(collection);
-    DocCollection col = cacheEntry == null ? null : cacheEntry.cached;
-    if (col != null) {
-      if (expectedVersion <= col.getZNodeVersion() && 
!cacheEntry.shouldRetry()) return col;
+  private CompletableFuture<DocCollection> triggerCollectionRefresh(String 
collection) {
+    if (closed) {
+      ExpiringCachedDocCollection cacheEntry = 
collectionStateCache.peek(collection);
+      DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
+      return CompletableFuture.completedFuture(cached);
     }
+    return collectionRefreshes.computeIfAbsent(
+        collection,
+        key -> {
+          ExecutorService executor = stateRefreshExecutor;
+          CompletableFuture<DocCollection> future;
+          if (executor == null || ExecutorUtil.isShutdown(executor)) {
+            future = new CompletableFuture<>();
+            try {
+              future.complete(loadDocCollection(key));
+            } catch (Throwable t) {
+              future.completeExceptionally(t);
+            }
+          } else {
+            future = CompletableFuture.supplyAsync(() -> 
loadDocCollection(key), executor);
+          }
+          future.whenComplete(

Review Comment:
   it could happen that `whenComplete` is invoked by the caller thread if the 
future has already completed.  I think.  The lambda you pass in will manipulate 
`collectionRefreshes`, which violates the rules of its compute methods on a 
ConcurrentHashMap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to