Hi Solr Community, I'm Marat, and I recently posted in the thread about executing multiple Solr queries in batch using the Streaming API. I've tried the code snippet discussed but found that the performance is not as expected—it's significantly slower compared to standard methods.
I'm looking to optimize the execution of batch queries without losing track of which response corresponds to which document or query. The main concerns are about the efficient mapping of initial clauses and whether a specific long boolean clause is necessary for proper execution. Could anyone suggest adjustments or different approaches that might improve performance? Are there configurations or optimizations within Solr or the Streaming API that I might be missing? Here's a brief overview of the method I've been testing: import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.apache.solr.client.solrj.io.stream.CloudSolrStream; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.common.SolrDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StreamReader { private static final Logger logger = LoggerFactory.getLogger(StreamReader.class); public CompletableFuture<Map<String, List<JoinFieldsWithStreamTaskResult>>> readMultipleStreams3(List<JoinFieldsWithStreamTask> joinFieldsWithStreamTasks, Function<Tuple, SolrDocument> mapper) { List<CompletableFuture<Pair<String, JoinFieldsWithStreamTaskResult>>> futures = joinFieldsWithStreamTasks.stream() .map(joinFieldsWithStreamTask -> joinFieldsWithStreamTask.getStreamTaskOpt() .map(task -> { long start = System.currentTimeMillis(); return readStream(task.getExpression(), mapper) .thenApply(res -> { long end = System.currentTimeMillis(); logger.info("Successfully read stream for joinFields: {}, took: {}ms", joinFieldsWithStreamTask.getId(), end - start); return new Pair<>(joinFieldsWithStreamTask.getId(), new JoinFieldsWithStreamTaskResult(joinFieldsWithStreamTask, res)); }) .exceptionally(e -> { logger.error("Failed to read stream for joinFields: {}", joinFieldsWithStreamTask.getId(), e); throw new RuntimeException(e); }); }) .orElseGet(() -> CompletableFuture.completedFuture(new Pair<>(joinFieldsWithStreamTask.getId(), new JoinFieldsWithStreamTaskResult(joinFieldsWithStreamTask, List.of())))) ).collect(Collectors.toList()); return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList())))); } private <T> CompletableFuture<List<T>> readStream(StreamExpression expression, Function<Tuple, T> mapper) { return CompletableFuture.supplyAsync(() -> { try (CloudSolrStream stream = new CloudSolrStream(expression, streamFactoryProvider.get())) { stream.setStreamContext(streamContextProvider.get()); stream.open(); return LazyList.generate(() -> stream.read()) .takeWhile(tuple -> !tuple.EOF) .map(mapper) .collect(Collectors.toList()); } catch (Exception e) { logger.error("Error reading stream with expression: {}", expression, e); throw new RuntimeException(e); } }); } private static class Pair<K, V> { private K key; private V value; public Pair(K key, V value) { this.key = key; this.value = value; } public K getKey() { return key; } public V getValue() { return value; } } } I appreciate any insights or recommendations on this topic. Thanks, Marat