rzo1 commented on code in PR #1544: URL: https://github.com/apache/stormcrawler/pull/1544#discussion_r2118134333
########## external/solr/src/main/java/org/apache/stormcrawler/solr/SolrConnection.java: ########## @@ -32,55 +35,213 @@ import org.apache.solr.client.solrj.impl.LBHttp2SolrClient; import org.apache.solr.client.solrj.impl.LBSolrClient; import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.stormcrawler.util.ConfUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SolrConnection { + private static final Logger LOG = LoggerFactory.getLogger(SolrConnection.class); - private SolrClient client; - private SolrClient updateClient; + private final SolrClient client; + private final SolrClient updateClient; - private boolean cloud; - private String collection; + private final boolean cloud; + private final String collection; + + private long lastUpdate; + private final Map<String, List<Update>> updateQueues; + private final Object lock = new Object(); + + private final int updateQueueSize = 10; + + // Maximum time (ms) without updates before we flush all the queues + private final long noUpdateThreshold = 20_000; + + // true if we deal with the sharded status collection + private final boolean statusCollection; + + private final ScheduledExecutorService executor; private SolrConnection( - SolrClient client, SolrClient updateClient, boolean cloud, String collection) { + SolrClient client, + SolrClient updateClient, + boolean cloud, + String collection, + boolean statusCollection) { this.client = client; this.updateClient = updateClient; this.cloud = cloud; this.collection = collection; + this.statusCollection = statusCollection; + + this.updateQueues = new HashMap<>(); + this.lastUpdate = System.currentTimeMillis(); + this.executor = Executors.newSingleThreadScheduledExecutor(); + + if (cloud) { + // Periodically check if we should flush + executor.scheduleAtFixedRate( + () -> flushAllUpdates(false), + noUpdateThreshold, + noUpdateThreshold, + TimeUnit.MILLISECONDS); + } } - public SolrClient getClient() { - return client; + private void flushAllUpdates(boolean force) { + synchronized (lock) { + if (!force && System.currentTimeMillis() - lastUpdate < noUpdateThreshold) return; + + CloudHttp2SolrClient cloudHttp2SolrClient = (CloudHttp2SolrClient) client; + DocCollection col = cloudHttp2SolrClient.getClusterState().getCollection(collection); + + // Flush all slices + for (var entry : updateQueues.entrySet()) { + List<Update> waitingUpdates = entry.getValue(); + if (waitingUpdates.isEmpty()) continue; + + Slice slice = col.getSlice(entry.getKey()); + flushUpdates(slice, waitingUpdates, cloudHttp2SolrClient); + } + } } - public SolrClient getUpdateClient() { - return updateClient; + private void updateAsync(Update update) { + synchronized (lock) { + lastUpdate = System.currentTimeMillis(); + + CloudHttp2SolrClient cloudHttp2SolrClient = (CloudHttp2SolrClient) client; + DocCollection col = cloudHttp2SolrClient.getClusterState().getCollection(collection); + + // Find slice for this update + Slice slice = null; + + if (statusCollection) { + // We have multiple shards, find the correct one + slice = getSlice(((DocUpdate) update).doc, col); + } else { + slice = col.getActiveSlices().stream().findFirst().orElse(null); + } + + if (slice == null) { + LOG.error("Could not find an active slice for update {}", update); + return; + } + + // Get the queue for this slice + List<Update> waitingUpdates = + updateQueues.computeIfAbsent(slice.getName(), k -> new ArrayList<>()); + waitingUpdates.add(update); + + if (waitingUpdates.size() == updateQueueSize) { + flushUpdates(slice, waitingUpdates, cloudHttp2SolrClient); + } + } + } + + /** + * Flush all waiting updates for this slice to the slice leader The request will fail, if the + * leader goes down before handling it + */ Review Comment: They only get re-fetched, if the user has enabled re-fetch on error or didn't disable re-fetching at all :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@stormcrawler.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org