dsmiley commented on code in PR #4176:
URL: https://github.com/apache/solr/pull/4176#discussion_r2870231950
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -105,7 +105,7 @@ public abstract class CloudSolrClient extends SolrClient {
private final boolean directUpdatesToLeadersOnly;
private final RequestReplicaListTransformerGenerator requestRLTGenerator;
private final boolean parallelUpdates;
- private ExecutorService threadPool =
+ private final ExecutorService threadPool =
Review Comment:
not critical but it's needless that this was non-final.
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -1658,41 +1657,34 @@ protected DocCollection getDocCollection(String
collection, Integer expectedVers
}
private CompletableFuture<DocCollection> triggerCollectionRefresh(String
collection) {
- if (closed) {
- ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(collection);
- DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
- return CompletableFuture.completedFuture(cached);
- }
- return collectionRefreshes.computeIfAbsent(
+ return collectionRefreshes.compute(
collection,
- key -> {
- ExecutorService executor = threadPool;
- CompletableFuture<DocCollection> future;
- if (executor == null || ExecutorUtil.isShutdown(executor)) {
- future = new CompletableFuture<>();
- try {
- future.complete(loadDocCollection(key));
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
+ (key, existingFuture) -> {
+ // A refresh is still in progress; return it.
+ if (existingFuture != null && !existingFuture.isDone()) {
+ return existingFuture;
+ }
Review Comment:
This is the essence of the fix. Everything else in the PR is an improvement
but non-critical.
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -1658,41 +1657,34 @@ protected DocCollection getDocCollection(String
collection, Integer expectedVers
}
private CompletableFuture<DocCollection> triggerCollectionRefresh(String
collection) {
- if (closed) {
- ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(collection);
- DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
- return CompletableFuture.completedFuture(cached);
- }
- return collectionRefreshes.computeIfAbsent(
+ return collectionRefreshes.compute(
collection,
- key -> {
- ExecutorService executor = threadPool;
- CompletableFuture<DocCollection> future;
- if (executor == null || ExecutorUtil.isShutdown(executor)) {
- future = new CompletableFuture<>();
- try {
- future.complete(loadDocCollection(key));
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
+ (key, existingFuture) -> {
+ // A refresh is still in progress; return it.
+ if (existingFuture != null && !existingFuture.isDone()) {
+ return existingFuture;
+ }
+ // No refresh is in-progress, so trigger it.
+
+ if (ExecutorUtil.isShutdown(threadPool)) {
+ assert closed; // see close() for the sequence
+ ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(key);
+ DocCollection cached = cacheEntry == null ? null :
cacheEntry.cached;
+ return CompletableFuture.completedFuture(cached);
} else {
- future =
- CompletableFuture.supplyAsync(
- () -> {
- stateRefreshSemaphore.acquireUninterruptibly();
- try {
- return loadDocCollection(key);
- } finally {
- stateRefreshSemaphore.release();
- }
- },
- executor);
+ return CompletableFuture.supplyAsync(
+ () -> {
+ stateRefreshSemaphore.acquireUninterruptibly();
+ try {
+ return loadDocCollection(key);
+ } finally {
+ stateRefreshSemaphore.release();
+ // Remove the entry in case of many collections
+ collectionRefreshes.remove(key);
+ }
+ },
+ threadPool);
}
- future.whenCompleteAsync(
- (result, error) -> {
- collectionRefreshes.remove(key, future);
- });
- return future;
Review Comment:
It's much lighter weight & simpler to read to incorporate this into the
single lambda callback to occur after the collection is loaded.
I spent a fair amount of time previously trying to assure myself on the
nuances of whenComplete vs whenCompleteAsync, and on returning the result of
this future vs not, or having the outer method actually do this. Played with a
debugger to inspect threads; putting sleep in places and running tests. It was
educational but I concluded we can do something much simpler.
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -1658,41 +1657,34 @@ protected DocCollection getDocCollection(String
collection, Integer expectedVers
}
private CompletableFuture<DocCollection> triggerCollectionRefresh(String
collection) {
- if (closed) {
- ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(collection);
- DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
- return CompletableFuture.completedFuture(cached);
- }
- return collectionRefreshes.computeIfAbsent(
+ return collectionRefreshes.compute(
collection,
- key -> {
- ExecutorService executor = threadPool;
- CompletableFuture<DocCollection> future;
- if (executor == null || ExecutorUtil.isShutdown(executor)) {
- future = new CompletableFuture<>();
- try {
- future.complete(loadDocCollection(key));
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
+ (key, existingFuture) -> {
+ // A refresh is still in progress; return it.
+ if (existingFuture != null && !existingFuture.isDone()) {
+ return existingFuture;
+ }
+ // No refresh is in-progress, so trigger it.
+
+ if (ExecutorUtil.isShutdown(threadPool)) {
+ assert closed; // see close() for the sequence
+ ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(key);
+ DocCollection cached = cacheEntry == null ? null :
cacheEntry.cached;
+ return CompletableFuture.completedFuture(cached);
} else {
- future =
- CompletableFuture.supplyAsync(
- () -> {
- stateRefreshSemaphore.acquireUninterruptibly();
- try {
- return loadDocCollection(key);
- } finally {
- stateRefreshSemaphore.release();
- }
- },
- executor);
+ return CompletableFuture.supplyAsync(
+ () -> {
+ stateRefreshSemaphore.acquireUninterruptibly();
+ try {
+ return loadDocCollection(key);
+ } finally {
+ stateRefreshSemaphore.release();
+ // Remove the entry in case of many collections
+ collectionRefreshes.remove(key);
Review Comment:
it should always remove the same future, by the way.
##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -1658,41 +1657,34 @@ protected DocCollection getDocCollection(String
collection, Integer expectedVers
}
private CompletableFuture<DocCollection> triggerCollectionRefresh(String
collection) {
- if (closed) {
- ExpiringCachedDocCollection cacheEntry =
collectionStateCache.peek(collection);
- DocCollection cached = cacheEntry == null ? null : cacheEntry.cached;
- return CompletableFuture.completedFuture(cached);
- }
Review Comment:
I found it confusing for this method to have two code paths for the closed
scenario. So I centralized it to one spot within the map.compute call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]