mvolikas commented on code in PR #1488:
URL: 
https://github.com/apache/incubator-stormcrawler/pull/1488#discussion_r1976647679


##########
external/solr/src/main/java/org/apache/stormcrawler/solr/SolrConnection.java:
##########
@@ -17,88 +17,127 @@
 package org.apache.stormcrawler.solr;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
+import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateHttp2SolrClient;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
-import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
 import org.apache.stormcrawler.util.ConfUtils;
 
 public class SolrConnection {
 
     private SolrClient client;
-    private UpdateRequest request;
+    private SolrClient updateClient;
 
-    private SolrConnection(SolrClient sc, UpdateRequest r) {
-        client = sc;
-        request = r;
+    private static boolean cloud;
+    private static String collection;
+
+    private SolrConnection(SolrClient client, SolrClient updateClient) {
+        this.client = client;
+        this.updateClient = updateClient;
     }
 
     public SolrClient getClient() {
         return client;
     }
 
-    public UpdateRequest getRequest() {
-        return request;
+    public SolrClient getUpdateClient() {
+        return updateClient;
     }
 
-    public static SolrClient getClient(Map stormConf, String boltType) {
+    public CompletableFuture<QueryResponse> requestAsync(QueryRequest request) 
{
+        if (cloud) {
+            CloudHttp2SolrClient cloudHttp2SolrClient = (CloudHttp2SolrClient) 
client;
+
+            // Get the Solr endpoints
+            Collection<Slice> activeSlices =
+                    cloudHttp2SolrClient
+                            .getClusterState()
+                            .getCollection(collection)
+                            .getActiveSlices();
+
+            List<LBSolrClient.Endpoint> endpoints = new ArrayList<>();
+            for (Slice slice : activeSlices) {
+                for (Replica replica : slice.getReplicas()) {
+                    if (replica.getState() == Replica.State.ACTIVE) {
+                        endpoints.add(new 
LBSolrClient.Endpoint(replica.getBaseUrl(), collection));
+                    }
+                }
+            }
+
+            // Shuffle the endpoints for basic load balancing
+            Collections.shuffle(endpoints);
+
+            // Get the async client
+            LBHttp2SolrClient lbHttp2SolrClient = 
cloudHttp2SolrClient.getLbClient();
+            LBSolrClient.Req req = new LBSolrClient.Req(request, endpoints);
+
+            return lbHttp2SolrClient
+                    .requestAsync(req)
+                    .thenApply(rsp -> new QueryResponse(rsp.getResponse(), 
lbHttp2SolrClient));
+        } else {
+            return ((Http2SolrClient) client)
+                    .requestAsync(request)
+                    .thenApply(nl -> new QueryResponse(nl, client));
+        }
+    }
+
+    public static SolrConnection getConnection(Map<String, Object> stormConf, 
String boltType) {
+        collection = ConfUtils.getString(stormConf, "solr." + boltType + 
".collection", null);
         String zkHost = ConfUtils.getString(stormConf, "solr." + boltType + 
".zkhost", null);
-        String solrUrl = ConfUtils.getString(stormConf, "solr." + boltType + 
".url", null);
-        String collection =
-                ConfUtils.getString(stormConf, "solr." + boltType + 
".collection", null);
-        int queueSize = ConfUtils.getInt(stormConf, "solr." + boltType + 
".queueSize", -1);
 
-        SolrClient client;
+        String solrUrl = ConfUtils.getString(stormConf, "solr." + boltType + 
".url", null);
+        int queueSize = ConfUtils.getInt(stormConf, "solr." + boltType + 
".queueSize", 100);
 
         if (StringUtils.isNotBlank(zkHost)) {
-            client = new 
CloudSolrClient.Builder(Collections.singletonList(zkHost)).build();
+            cloud = true;
+
+            CloudHttp2SolrClient.Builder builder =
+                    new 
CloudHttp2SolrClient.Builder(Collections.singletonList(zkHost));
+
             if (StringUtils.isNotBlank(collection)) {
-                ((CloudSolrClient) client).setDefaultCollection(collection);
+                builder.withDefaultCollection(collection);
             }
-        } else if (StringUtils.isNotBlank(solrUrl)) {
-            if (queueSize == -1) {
-                client = new Http2SolrClient.Builder(solrUrl).build();
-            } else {
-                client =
-                        new ConcurrentUpdateSolrClient.Builder(solrUrl)
-                                .withQueueSize(queueSize)
-                                .build();
-            }
-        } else {
-            throw new RuntimeException("SolrClient should have zk or solr URL 
set up");
-        }
 
-        return client;
-    }
+            CloudHttp2SolrClient cloudHttp2SolrClient = builder.build();

Review Comment:
   CloudHttp2SolrClient does not support requestAsync directly like 
Http2SolrClient, that's why I used the wrapped LBHttp2SolrClient instead.
   Additionally, batched updates are not supported like in the 
ConcurrentUpdateSolrClient. The only workaround I can think of is implementing 
the batching manually.
   I will submit a question to the Solr users list on whether any of those 
features will come with future Solr versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@stormcrawler.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to