The code snippet you provided is pretty dense!  I don’t have a direct answer, 
but do have a couple of thoughts.

1) Do you have a small demo project that this code could be part of that you 
could share?  An example is 
https://github.com/epugh/playing-with-solr-streaming-expressions/ where I’ve 
used a simple Solr schema and documented what I’ve written. 

2) I notice you are using direct Java code, and I think I read a blog post from 
Joel about how hard that is….  I have a PR out to add a bin/solr stream 
<mystream> command to the Solr CLI.   As part of that, I worked through how to 
parse a streaming expression and run it in Java code.  
https://github.com/apache/solr/pull/2479/files#diff-5b4ff2572d21841dad43f2ee976e52b5465606ba686b9da1a8c423815b897f4cR220
 shows that code.   Do you need to write your streaming expression in Java, or 
could you write it in text and use the parsers?



Eric


> On Jun 26, 2024, at 8:13 PM, M E <ermma...@gmail.com> wrote:
> 
> Hi Solr Community,
> 
> I'm Marat, and I recently posted in the thread about executing multiple
> Solr queries in batch using the Streaming API. I've tried the code snippet
> discussed but found that the performance is not as expected—it's
> significantly slower compared to standard methods.
> 
> I'm looking to optimize the execution of batch queries without losing track
> of which response corresponds to which document or query. The main concerns
> are about the efficient mapping of initial clauses and whether a specific
> long boolean clause is necessary for proper execution.
> 
> Could anyone suggest adjustments or different approaches that might improve
> performance? Are there configurations or optimizations within Solr or the
> Streaming API that I might be missing?
> 
> Here's a brief overview of the method I've been testing:
> 
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.ExecutionException;
> import java.util.stream.Collectors;
> import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
> import org.apache.solr.client.solrj.io.Tuple;
> import org.apache.solr.common.SolrDocument;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> public class StreamReader {
>    private static final Logger logger =
> LoggerFactory.getLogger(StreamReader.class);
> 
>    public CompletableFuture<Map<String,
> List<JoinFieldsWithStreamTaskResult>>>
> readMultipleStreams3(List<JoinFieldsWithStreamTask>
> joinFieldsWithStreamTasks, Function<Tuple, SolrDocument> mapper) {
>        List<CompletableFuture<Pair<String,
> JoinFieldsWithStreamTaskResult>>> futures =
> joinFieldsWithStreamTasks.stream()
>            .map(joinFieldsWithStreamTask ->
> joinFieldsWithStreamTask.getStreamTaskOpt()
>                .map(task -> {
>                    long start = System.currentTimeMillis();
>                    return readStream(task.getExpression(), mapper)
>                        .thenApply(res -> {
>                            long end = System.currentTimeMillis();
>                            logger.info("Successfully read stream for
> joinFields: {}, took: {}ms", joinFieldsWithStreamTask.getId(), end -
> start);
>                            return new
> Pair<>(joinFieldsWithStreamTask.getId(), new
> JoinFieldsWithStreamTaskResult(joinFieldsWithStreamTask, res));
>                        })
>                        .exceptionally(e -> {
>                            logger.error("Failed to read stream for
> joinFields: {}", joinFieldsWithStreamTask.getId(), e);
>                            throw new RuntimeException(e);
>                        });
>                })
>                .orElseGet(() -> CompletableFuture.completedFuture(new
> Pair<>(joinFieldsWithStreamTask.getId(), new
> JoinFieldsWithStreamTaskResult(joinFieldsWithStreamTask, List.of()))))
>            ).collect(Collectors.toList());
> 
>        return CompletableFuture.allOf(futures.toArray(new
> CompletableFuture[0]))
>                .thenApply(v -> futures.stream()
>                        .map(CompletableFuture::join)
>                        .collect(Collectors.groupingBy(Pair::getKey,
> Collectors.mapping(Pair::getValue, Collectors.toList()))));
>    }
> 
>    private <T> CompletableFuture<List<T>> readStream(StreamExpression
> expression, Function<Tuple, T> mapper) {
>        return CompletableFuture.supplyAsync(() -> {
>            try (CloudSolrStream stream = new
> CloudSolrStream(expression, streamFactoryProvider.get())) {
>                stream.setStreamContext(streamContextProvider.get());
>                stream.open();
>                return LazyList.generate(() -> stream.read())
>                    .takeWhile(tuple -> !tuple.EOF)
>                    .map(mapper)
>                    .collect(Collectors.toList());
>            } catch (Exception e) {
>                logger.error("Error reading stream with expression:
> {}", expression, e);
>                throw new RuntimeException(e);
>            }
>        });
>    }
> 
>    private static class Pair<K, V> {
>        private K key;
>        private V value;
> 
>        public Pair(K key, V value) {
>            this.key = key;
>            this.value = value;
>        }
> 
>        public K getKey() {
>            return key;
>        }
> 
>        public V getValue() {
>            return value;
>        }
>    }
> }
> 
> 
> 
> I appreciate any insights or recommendations on this topic.
> 
> Thanks,
> Marat

_______________________
Eric Pugh | Founder | OpenSource Connections, LLC | 434.466.1467 | 
http://www.opensourceconnections.com <http://www.opensourceconnections.com/> | 
My Free/Busy <http://tinyurl.com/eric-cal>  
Co-Author: Apache Solr Enterprise Search Server, 3rd Ed 
<https://www.packtpub.com/big-data-and-business-intelligence/apache-solr-enterprise-search-server-third-edition-raw>
    
This e-mail and all contents, including attachments, is considered to be 
Company Confidential unless explicitly stated otherwise, regardless of whether 
attachments are marked as such.

Reply via email to