Hi Solr Community,

I'm Marat, and I recently posted in the thread about executing multiple
Solr queries in batch using the Streaming API. I've tried the code snippet
discussed but found that the performance is not as expected—it's
significantly slower compared to standard methods.

I'm looking to optimize the execution of batch queries without losing track
of which response corresponds to which document or query. The main concerns
are about the efficient mapping of initial clauses and whether a specific
long boolean clause is necessary for proper execution.

Could anyone suggest adjustments or different approaches that might improve
performance? Are there configurations or optimizations within Solr or the
Streaming API that I might be missing?

Here's a brief overview of the method I've been testing:

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.common.SolrDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamReader {
    private static final Logger logger =
LoggerFactory.getLogger(StreamReader.class);

    public CompletableFuture<Map<String,
List<JoinFieldsWithStreamTaskResult>>>
readMultipleStreams3(List<JoinFieldsWithStreamTask>
joinFieldsWithStreamTasks, Function<Tuple, SolrDocument> mapper) {
        List<CompletableFuture<Pair<String,
JoinFieldsWithStreamTaskResult>>> futures =
joinFieldsWithStreamTasks.stream()
            .map(joinFieldsWithStreamTask ->
joinFieldsWithStreamTask.getStreamTaskOpt()
                .map(task -> {
                    long start = System.currentTimeMillis();
                    return readStream(task.getExpression(), mapper)
                        .thenApply(res -> {
                            long end = System.currentTimeMillis();
                            logger.info("Successfully read stream for
joinFields: {}, took: {}ms", joinFieldsWithStreamTask.getId(), end -
start);
                            return new
Pair<>(joinFieldsWithStreamTask.getId(), new
JoinFieldsWithStreamTaskResult(joinFieldsWithStreamTask, res));
                        })
                        .exceptionally(e -> {
                            logger.error("Failed to read stream for
joinFields: {}", joinFieldsWithStreamTask.getId(), e);
                            throw new RuntimeException(e);
                        });
                })
                .orElseGet(() -> CompletableFuture.completedFuture(new
Pair<>(joinFieldsWithStreamTask.getId(), new
JoinFieldsWithStreamTaskResult(joinFieldsWithStreamTask, List.of()))))
            ).collect(Collectors.toList());

        return CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0]))
                .thenApply(v -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.groupingBy(Pair::getKey,
Collectors.mapping(Pair::getValue, Collectors.toList()))));
    }

    private <T> CompletableFuture<List<T>> readStream(StreamExpression
expression, Function<Tuple, T> mapper) {
        return CompletableFuture.supplyAsync(() -> {
            try (CloudSolrStream stream = new
CloudSolrStream(expression, streamFactoryProvider.get())) {
                stream.setStreamContext(streamContextProvider.get());
                stream.open();
                return LazyList.generate(() -> stream.read())
                    .takeWhile(tuple -> !tuple.EOF)
                    .map(mapper)
                    .collect(Collectors.toList());
            } catch (Exception e) {
                logger.error("Error reading stream with expression:
{}", expression, e);
                throw new RuntimeException(e);
            }
        });
    }

    private static class Pair<K, V> {
        private K key;
        private V value;

        public Pair(K key, V value) {
            this.key = key;
            this.value = value;
        }

        public K getKey() {
            return key;
        }

        public V getValue() {
            return value;
        }
    }
}



I appreciate any insights or recommendations on this topic.

Thanks,
Marat

Reply via email to