mvolikas commented on code in PR #1544: URL: https://github.com/apache/stormcrawler/pull/1544#discussion_r2147817925
########## external/solr/src/main/java/org/apache/stormcrawler/solr/SolrConnection.java: ########## @@ -32,55 +35,237 @@ import org.apache.solr.client.solrj.impl.LBHttp2SolrClient; import org.apache.solr.client.solrj.impl.LBSolrClient; import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.stormcrawler.util.ConfUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SolrConnection { + private static final Logger LOG = LoggerFactory.getLogger(SolrConnection.class); - private SolrClient client; - private SolrClient updateClient; + private final SolrClient client; + private final SolrClient updateClient; - private boolean cloud; - private String collection; + private final boolean cloud; + private final String collection; + + private long lastUpdate; + private final Map<String, List<Update>> updateQueues; + private final Object lock = new Object(); + + private final int updateQueueSize; + + // Maximum time (ms) without updates before we flush all the queues + private final long noUpdateThreshold; + + // true if we deal with the sharded status collection + private final boolean statusCollection; + + private final ScheduledExecutorService executor; private SolrConnection( - SolrClient client, SolrClient updateClient, boolean cloud, String collection) { + SolrClient client, + SolrClient updateClient, + boolean cloud, + String collection, + boolean statusCollection, + int updateQueueSize, + long noUpdateThreshold) { this.client = client; this.updateClient = updateClient; this.cloud = cloud; this.collection = collection; + this.statusCollection = statusCollection; + + this.updateQueues = new HashMap<>(); + this.lastUpdate = System.currentTimeMillis(); + this.executor = Executors.newSingleThreadScheduledExecutor(); + + this.updateQueueSize = updateQueueSize; + this.noUpdateThreshold = noUpdateThreshold; + + if (cloud) { + // Periodically check if we should flush + executor.scheduleAtFixedRate( + () -> flushAllUpdates(false), + noUpdateThreshold, + noUpdateThreshold, + TimeUnit.MILLISECONDS); + } } - public SolrClient getClient() { - return client; + private void flushAllUpdates(boolean force) { + synchronized (lock) { + if (!force && System.currentTimeMillis() - lastUpdate < noUpdateThreshold) return; + + CloudHttp2SolrClient cloudHttp2SolrClient = (CloudHttp2SolrClient) client; + DocCollection col = cloudHttp2SolrClient.getClusterState().getCollection(collection); + + // Flush all slices + for (var entry : updateQueues.entrySet()) { + List<Update> waitingUpdates = entry.getValue(); + if (waitingUpdates.isEmpty()) continue; + + Slice slice = col.getSlice(entry.getKey()); + Replica leader = slice.getLeader(); + + if (leader == null) { + LOG.error("Could not find the leader for slice {}", slice.getName()); + return; + } + + flushUpdates(leader, waitingUpdates, cloudHttp2SolrClient); + } + } } - public SolrClient getUpdateClient() { - return updateClient; + private void updateAsync(Update update) { + synchronized (lock) { + lastUpdate = System.currentTimeMillis(); + + CloudHttp2SolrClient cloudHttp2SolrClient = (CloudHttp2SolrClient) client; + DocCollection col = cloudHttp2SolrClient.getClusterState().getCollection(collection); + + // Find slice for this update + Slice slice = null; + + if (statusCollection) { + // We have multiple shards, find the correct one + slice = getSlice(((DocUpdate) update).doc, col); + } else { + slice = col.getActiveSlices().stream().findFirst().orElse(null); + } + + if (slice == null) { + LOG.error("Could not find an active slice for update {}", update); Review Comment: This should not happen unless a shard is being split, which, in turn, should not happen in the context of StormCrawler while the crawl is running. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@stormcrawler.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org