dsmiley commented on code in PR #3296: URL: https://github.com/apache/solr/pull/3296#discussion_r2020311938
########## solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java: ########## @@ -264,15 +265,6 @@ public Tuple read() throws IOException { } } - private ModifiableSolrParams getParams(Map<String, String> props) { Review Comment: unrelated small improvement; this method isn't needed ########## solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java: ########## @@ -352,56 +354,49 @@ public Tuple read() throws IOException { numDocs += (Integer) resp.get("numDocs"); - for (int i = 0; i < shardTopTerms.size(); i++) { - String term = shardTopTerms.getName(i); - double score = shardTopTerms.getVal(i); - int docFreq = shardDocFreqs.get(term); - double prevScore = termScores.containsKey(term) ? termScores.get(term) : 0; - long prevDocFreq = docFreqs.containsKey(term) ? docFreqs.get(term) : 0; - termScores.put(term, prevScore + score); - docFreqs.put(term, prevDocFreq + docFreq); - } - } - - List<Tuple> tuples = new ArrayList<>(numTerms); - termScores = sortByValue(termScores); - int index = 0; - for (Map.Entry<String, Double> termScore : termScores.entrySet()) { - if (tuples.size() == numTerms) break; - index++; - Tuple tuple = new Tuple(); - tuple.put(ID, featureSet + "_" + index); - tuple.put("index_i", index); - tuple.put("term_s", termScore.getKey()); - tuple.put("score_f", termScore.getValue()); - tuple.put("featureSet_s", featureSet); - long docFreq = docFreqs.get(termScore.getKey()); - double d = Math.log(((double) numDocs / (double) (docFreq + 1))); - tuple.put("idf_d", d); - tuples.add(tuple); + shardTopTerms.forEach( + (term, score) -> { + int docFreq = shardDocFreqs.get(term); + termScores.merge(term, score, Double::sum); + docFreqs.merge(term, (long) docFreq, Long::sum); + }); } - - tuples.add(Tuple.EOF()); - - tupleIterator = tuples.iterator(); + final long numDocsF = numDocs; // make final + + final AtomicInteger idGen = new AtomicInteger(1); + + tupleIterator = + termScores.entrySet().stream() + .sorted( // sort by score descending + Comparator.<Map.Entry<String, Double>>comparingDouble(Entry::getValue) + .reversed()) + .limit(numTerms) + .map( + (termScore) -> { + int index = idGen.getAndIncrement(); + Tuple tuple = new Tuple(); + tuple.put(ID, featureSet + "_" + index); + tuple.put("index_i", index); + tuple.put("term_s", termScore.getKey()); + tuple.put("score_f", termScore.getValue()); + tuple.put("featureSet_s", featureSet); + long docFreq = docFreqs.get(termScore.getKey()); + double d = Math.log(((double) numDocsF / (double) (docFreq + 1))); + tuple.put("idf_d", d); + return tuple; + }) + .iterator(); + } + if (tupleIterator.hasNext()) { + return tupleIterator.next(); + } else { + return Tuple.EOF(); } - - return tupleIterator.next(); } catch (Exception e) { throw new IOException(e); } } - private <K, V extends Comparable<? super V>> Map<K, V> sortByValue(Map<K, V> map) { Review Comment: this was sad; building a new intermediate LinkedHashMap that was ultimately unnecessary as this PR shows ########## solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java: ########## @@ -352,56 +354,49 @@ public Tuple read() throws IOException { numDocs += (Integer) resp.get("numDocs"); - for (int i = 0; i < shardTopTerms.size(); i++) { - String term = shardTopTerms.getName(i); - double score = shardTopTerms.getVal(i); - int docFreq = shardDocFreqs.get(term); - double prevScore = termScores.containsKey(term) ? termScores.get(term) : 0; - long prevDocFreq = docFreqs.containsKey(term) ? docFreqs.get(term) : 0; - termScores.put(term, prevScore + score); - docFreqs.put(term, prevDocFreq + docFreq); - } - } - - List<Tuple> tuples = new ArrayList<>(numTerms); - termScores = sortByValue(termScores); - int index = 0; - for (Map.Entry<String, Double> termScore : termScores.entrySet()) { - if (tuples.size() == numTerms) break; - index++; - Tuple tuple = new Tuple(); - tuple.put(ID, featureSet + "_" + index); - tuple.put("index_i", index); - tuple.put("term_s", termScore.getKey()); - tuple.put("score_f", termScore.getValue()); - tuple.put("featureSet_s", featureSet); - long docFreq = docFreqs.get(termScore.getKey()); - double d = Math.log(((double) numDocs / (double) (docFreq + 1))); - tuple.put("idf_d", d); - tuples.add(tuple); + shardTopTerms.forEach( + (term, score) -> { + int docFreq = shardDocFreqs.get(term); + termScores.merge(term, score, Double::sum); + docFreqs.merge(term, (long) docFreq, Long::sum); + }); } - - tuples.add(Tuple.EOF()); - - tupleIterator = tuples.iterator(); + final long numDocsF = numDocs; // make final + + final AtomicInteger idGen = new AtomicInteger(1); + + tupleIterator = + termScores.entrySet().stream() + .sorted( // sort by score descending + Comparator.<Map.Entry<String, Double>>comparingDouble(Entry::getValue) + .reversed()) + .limit(numTerms) + .map( + (termScore) -> { + int index = idGen.getAndIncrement(); + Tuple tuple = new Tuple(); + tuple.put(ID, featureSet + "_" + index); + tuple.put("index_i", index); + tuple.put("term_s", termScore.getKey()); + tuple.put("score_f", termScore.getValue()); + tuple.put("featureSet_s", featureSet); + long docFreq = docFreqs.get(termScore.getKey()); + double d = Math.log(((double) numDocsF / (double) (docFreq + 1))); + tuple.put("idf_d", d); + return tuple; + }) + .iterator(); + } + if (tupleIterator.hasNext()) { + return tupleIterator.next(); + } else { + return Tuple.EOF(); } - - return tupleIterator.next(); } catch (Exception e) { Review Comment: I didn't touch this but I'm very tempted. This try-catch is sloppy for multiple reasons. First, catching `Exception` in particular should be done sparingly. This code wraps RuntimeExceptions and even existing IOExceptions as an IOException. Surely this isn't right. Furthermroe, this try-catch is so all-encompassing, including logic like the iterator stuff that shouldn't belong within it's super long scope. It overall makes the method hard to read (cyclomatic complexity). ########## solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java: ########## @@ -352,56 +354,49 @@ public Tuple read() throws IOException { numDocs += (Integer) resp.get("numDocs"); - for (int i = 0; i < shardTopTerms.size(); i++) { - String term = shardTopTerms.getName(i); - double score = shardTopTerms.getVal(i); - int docFreq = shardDocFreqs.get(term); - double prevScore = termScores.containsKey(term) ? termScores.get(term) : 0; - long prevDocFreq = docFreqs.containsKey(term) ? docFreqs.get(term) : 0; - termScores.put(term, prevScore + score); - docFreqs.put(term, prevDocFreq + docFreq); - } - } - - List<Tuple> tuples = new ArrayList<>(numTerms); - termScores = sortByValue(termScores); - int index = 0; - for (Map.Entry<String, Double> termScore : termScores.entrySet()) { - if (tuples.size() == numTerms) break; - index++; - Tuple tuple = new Tuple(); - tuple.put(ID, featureSet + "_" + index); - tuple.put("index_i", index); - tuple.put("term_s", termScore.getKey()); - tuple.put("score_f", termScore.getValue()); - tuple.put("featureSet_s", featureSet); - long docFreq = docFreqs.get(termScore.getKey()); - double d = Math.log(((double) numDocs / (double) (docFreq + 1))); - tuple.put("idf_d", d); - tuples.add(tuple); + shardTopTerms.forEach( + (term, score) -> { + int docFreq = shardDocFreqs.get(term); + termScores.merge(term, score, Double::sum); + docFreqs.merge(term, (long) docFreq, Long::sum); Review Comment: merge is so elegant here; i had to ########## solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java: ########## @@ -352,56 +354,49 @@ public Tuple read() throws IOException { numDocs += (Integer) resp.get("numDocs"); - for (int i = 0; i < shardTopTerms.size(); i++) { - String term = shardTopTerms.getName(i); - double score = shardTopTerms.getVal(i); - int docFreq = shardDocFreqs.get(term); - double prevScore = termScores.containsKey(term) ? termScores.get(term) : 0; - long prevDocFreq = docFreqs.containsKey(term) ? docFreqs.get(term) : 0; - termScores.put(term, prevScore + score); - docFreqs.put(term, prevDocFreq + docFreq); - } - } - - List<Tuple> tuples = new ArrayList<>(numTerms); - termScores = sortByValue(termScores); - int index = 0; - for (Map.Entry<String, Double> termScore : termScores.entrySet()) { - if (tuples.size() == numTerms) break; - index++; - Tuple tuple = new Tuple(); - tuple.put(ID, featureSet + "_" + index); - tuple.put("index_i", index); - tuple.put("term_s", termScore.getKey()); - tuple.put("score_f", termScore.getValue()); - tuple.put("featureSet_s", featureSet); - long docFreq = docFreqs.get(termScore.getKey()); - double d = Math.log(((double) numDocs / (double) (docFreq + 1))); - tuple.put("idf_d", d); - tuples.add(tuple); + shardTopTerms.forEach( + (term, score) -> { + int docFreq = shardDocFreqs.get(term); + termScores.merge(term, score, Double::sum); + docFreqs.merge(term, (long) docFreq, Long::sum); + }); } - - tuples.add(Tuple.EOF()); - - tupleIterator = tuples.iterator(); + final long numDocsF = numDocs; // make final + + final AtomicInteger idGen = new AtomicInteger(1); + + tupleIterator = + termScores.entrySet().stream() + .sorted( // sort by score descending + Comparator.<Map.Entry<String, Double>>comparingDouble(Entry::getValue) + .reversed()) + .limit(numTerms) + .map( + (termScore) -> { + int index = idGen.getAndIncrement(); + Tuple tuple = new Tuple(); + tuple.put(ID, featureSet + "_" + index); + tuple.put("index_i", index); + tuple.put("term_s", termScore.getKey()); + tuple.put("score_f", termScore.getValue()); + tuple.put("featureSet_s", featureSet); + long docFreq = docFreqs.get(termScore.getKey()); + double d = Math.log(((double) numDocsF / (double) (docFreq + 1))); + tuple.put("idf_d", d); + return tuple; + }) + .iterator(); + } + if (tupleIterator.hasNext()) { + return tupleIterator.next(); + } else { + return Tuple.EOF(); Review Comment: This approach means the caller could in theory keep calling read() and we'd keep returning EOF instead of abruptly ending with some sort of exception (ISE?). I figure this is fine? ########## solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java: ########## @@ -261,15 +262,6 @@ public Tuple read() throws IOException { } } - private ModifiableSolrParams getParams(Map<String, String> props) { Review Comment: unrelated small improvement; this method isn't needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For additional commands, e-mail: issues-h...@solr.apache.org