The code snippet you provided is pretty dense! I don’t have a direct answer, but do have a couple of thoughts.
1) Do you have a small demo project that this code could be part of that you could share? An example is https://github.com/epugh/playing-with-solr-streaming-expressions/ where I’ve used a simple Solr schema and documented what I’ve written. 2) I notice you are using direct Java code, and I think I read a blog post from Joel about how hard that is…. I have a PR out to add a bin/solr stream <mystream> command to the Solr CLI. As part of that, I worked through how to parse a streaming expression and run it in Java code. https://github.com/apache/solr/pull/2479/files#diff-5b4ff2572d21841dad43f2ee976e52b5465606ba686b9da1a8c423815b897f4cR220 shows that code. Do you need to write your streaming expression in Java, or could you write it in text and use the parsers? Eric > On Jun 26, 2024, at 8:13 PM, M E <ermma...@gmail.com> wrote: > > Hi Solr Community, > > I'm Marat, and I recently posted in the thread about executing multiple > Solr queries in batch using the Streaming API. I've tried the code snippet > discussed but found that the performance is not as expected—it's > significantly slower compared to standard methods. > > I'm looking to optimize the execution of batch queries without losing track > of which response corresponds to which document or query. The main concerns > are about the efficient mapping of initial clauses and whether a specific > long boolean clause is necessary for proper execution. > > Could anyone suggest adjustments or different approaches that might improve > performance? Are there configurations or optimizations within Solr or the > Streaming API that I might be missing? > > Here's a brief overview of the method I've been testing: > > import java.util.List; > import java.util.Map; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.ExecutionException; > import java.util.stream.Collectors; > import org.apache.solr.client.solrj.io.stream.CloudSolrStream; > import org.apache.solr.client.solrj.io.Tuple; > import org.apache.solr.common.SolrDocument; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > public class StreamReader { > private static final Logger logger = > LoggerFactory.getLogger(StreamReader.class); > > public CompletableFuture<Map<String, > List<JoinFieldsWithStreamTaskResult>>> > readMultipleStreams3(List<JoinFieldsWithStreamTask> > joinFieldsWithStreamTasks, Function<Tuple, SolrDocument> mapper) { > List<CompletableFuture<Pair<String, > JoinFieldsWithStreamTaskResult>>> futures = > joinFieldsWithStreamTasks.stream() > .map(joinFieldsWithStreamTask -> > joinFieldsWithStreamTask.getStreamTaskOpt() > .map(task -> { > long start = System.currentTimeMillis(); > return readStream(task.getExpression(), mapper) > .thenApply(res -> { > long end = System.currentTimeMillis(); > logger.info("Successfully read stream for > joinFields: {}, took: {}ms", joinFieldsWithStreamTask.getId(), end - > start); > return new > Pair<>(joinFieldsWithStreamTask.getId(), new > JoinFieldsWithStreamTaskResult(joinFieldsWithStreamTask, res)); > }) > .exceptionally(e -> { > logger.error("Failed to read stream for > joinFields: {}", joinFieldsWithStreamTask.getId(), e); > throw new RuntimeException(e); > }); > }) > .orElseGet(() -> CompletableFuture.completedFuture(new > Pair<>(joinFieldsWithStreamTask.getId(), new > JoinFieldsWithStreamTaskResult(joinFieldsWithStreamTask, List.of())))) > ).collect(Collectors.toList()); > > return CompletableFuture.allOf(futures.toArray(new > CompletableFuture[0])) > .thenApply(v -> futures.stream() > .map(CompletableFuture::join) > .collect(Collectors.groupingBy(Pair::getKey, > Collectors.mapping(Pair::getValue, Collectors.toList())))); > } > > private <T> CompletableFuture<List<T>> readStream(StreamExpression > expression, Function<Tuple, T> mapper) { > return CompletableFuture.supplyAsync(() -> { > try (CloudSolrStream stream = new > CloudSolrStream(expression, streamFactoryProvider.get())) { > stream.setStreamContext(streamContextProvider.get()); > stream.open(); > return LazyList.generate(() -> stream.read()) > .takeWhile(tuple -> !tuple.EOF) > .map(mapper) > .collect(Collectors.toList()); > } catch (Exception e) { > logger.error("Error reading stream with expression: > {}", expression, e); > throw new RuntimeException(e); > } > }); > } > > private static class Pair<K, V> { > private K key; > private V value; > > public Pair(K key, V value) { > this.key = key; > this.value = value; > } > > public K getKey() { > return key; > } > > public V getValue() { > return value; > } > } > } > > > > I appreciate any insights or recommendations on this topic. > > Thanks, > Marat _______________________ Eric Pugh | Founder | OpenSource Connections, LLC | 434.466.1467 | http://www.opensourceconnections.com <http://www.opensourceconnections.com/> | My Free/Busy <http://tinyurl.com/eric-cal> Co-Author: Apache Solr Enterprise Search Server, 3rd Ed <https://www.packtpub.com/big-data-and-business-intelligence/apache-solr-enterprise-search-server-third-edition-raw> This e-mail and all contents, including attachments, is considered to be Company Confidential unless explicitly stated otherwise, regardless of whether attachments are marked as such.