Hi Viktor Your previous response inspired me to experiment some more with Gatherers As a result I'd like to make a couple propositions:
* add an additional type parameter. After doing so, type inference no longer needs any assistance: `var maxGatherer = Gatherer.ofSequential(State::new, State::integrate, State::finish);` * add an identity Gatherer with an optimized `andThen` implementation as well as an optimization in the default implementation of `Gatherer::andThen` * eliminate the use of raw types in `Gatherers.Value` Code that implements these propositions is in this gist: https://gist.github.com/anthonyvdotbe/37c85eaa86a7833051bc33f6fe88046c Kind regards, Anthony July 31, 2024 at 7:58 PM, "Viktor Klang" <viktor.kl...@oracle.com> wrote: > > Hi Anthony, > > >The use case is a time series, which has methods to return a Stream of data > >points, `record DataPoint(Instant, BigDecimal)`. In DataPoint, there are > >several Gatherer factory methods, one of which is `Gatherer<DataPoint, ?, > >DataPoint> withInterpolationAt(NavigableSet<Instant> instants)`. If > >`instants.isEmpty()`, it returns a no-op Gatherer. In general, I guess most > >factory methods with a collection parameter (and compatible type arguments > >for T and R) will have a degenerate case like this when the collection is > >empty. `<T> Gatherer<T, ?, T> append(T... elements)` would be another > >example. > > `identity()` would also allow an optimized `andThen` implementation, simply > returning its argument. And when uncomposed, the Stream library could > eliminate the `gather` stage, allowing characteristics to propogate in this > case. So `treeSet.stream().gather(identity()).sorted().distinct().toList()` > could be optimized to `treeSet.stream().toList()`. > > Have you experimented with implementing your own identity Gatherer and > implemented its andThen to return the second argument? > > >That being said, I hadn't considered cases where an intermediate stage in > >the pipeline would not propagate the characteristics. And in cases where the > >intermediate stage doesn't affect the characteristics, it would actually be > >worse to use something like `Gatherers.sorted().andThen(…)`, instead of just > >keeping track of the previous element and throwing an IllegalStateException > >if necessary. > > Yeah, that or implementing a reorder buffer Gatherer (in case you have unique > and continuous sequence numbers). > > >This raises a new question though: on line 27 I'd expect I wouldn't need to > >specify the type arguments for the `ofSequential` method invocation. Is this > >hitting inherent limitations of type inference or is it possible that some > >generic type bounds aren't as generic as they could be, prohibiting type > >inference in certain cases? > > Yes, there are some limitations to inference, you can see usage examples in > the implementation of Gatherers which does need some assistance to > inference:https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/stream/Gatherers.java > > Cheers, > > √ > > **Viktor Klang** > Software Architect, Java Platform Group > > Oracle > > ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ > > **From:** Anthony Vanelverdinghe <d...@anthonyv.be> > **Sent:** Tuesday, 30 July 2024 17:20 > **To:** Viktor Klang <viktor.kl...@oracle.com>; core-libs-dev@openjdk.org > <core-libs-dev@openjdk.org> > **Subject:** [External] : Re: Stream Gatherers (JEP 473) feedback > > > July 29, 2024 at 8:08 PM, "Viktor Klang" <viktor.kl...@oracle.com> wrote: > > > > > > Hi Anthony, > > Hi Viktor > > > Thank you for your patience, and for providing feedback, it is always much > > appreciated. > > > > > > >When writing factory methods for Gatherers, there's sometimes a > > > degenerate case that requires returning a no-op Gatherer. So I'd like a > > > way to mark a no-op Gatherer as such, allowing the Stream implementation > > > to recognize and eliminate it from the pipeline. One idea is to add > > > Gatherer.defaultIntegrator(), analogous to the other default… methods. > > > Another is to add Gatherers.identity(), analogous to Function.identity(). > > > > > > I contemplated adding that but in the end I decided I didn't want to add it > > for the sake of adding it, > > > but rather adding it in case it was deemed necessary. > > > > > > Do you have a concrete use-case (code) that you could share? > > The use case is a time series, which has methods to return a Stream of data > points, `record DataPoint(Instant, BigDecimal)`. In DataPoint, there are > several Gatherer factory methods, one of which is `Gatherer<DataPoint, ?, > DataPoint> withInterpolationAt(NavigableSet<Instant> instants)`. If > `instants.isEmpty()`, it returns a no-op Gatherer. In general, I guess most > factory methods with a collection parameter (and compatible type arguments > for T and R) will have a degenerate case like this when the collection is > empty. `<T> Gatherer<T, ?, T> append(T... elements)` would be another example. > > `identity()` would also allow an optimized `andThen` implementation, simply > returning its argument. And when uncomposed, the Stream library could > eliminate the `gather` stage, allowing characteristics to propogate in this > case. So `treeSet.stream().gather(identity()).sorted().distinct().toList()` > could be optimized to `treeSet.stream().toList()`. > > > >Sometimes a factory method returns a Gatherer that only works correctly > > > if the upstream has certain characteristics, for example > > > Spliterator.SORTED or Spliterator.DISTINCT. > > > > > > Do you have a concrete use-case (code) that you could share? > > All the Streams that are returned from TimeSeries are well-formed: their data > points are sorted and distinct. And the Gatherer factory methods in DataPoint > assume that their upstreams have these characteristics. However, we can't > prevent clients from constructing malformed Streams and invoking the > Gatherers on them, which will give erroneous results. Now the Gatherer could > keep track of the previous element and verify that the current element is > greater than the previous. But the idea was to eliminate this bookkeeping for > well-formed Streams, while still preventing erroneous results. > > > >One idea is to add methods > > > like Gatherers.sorted() and Gatherers.distinct(), where the Stream > > > implementation would be able to recognize and eliminate these from the > > > pipeline if the upstream already has these characteristics. That way > > > we'd be able to write `return Gatherers.sorted().andThen(…);`. Another > > > idea is to provide a Gatherer with a way to inspect the upstream > > > characteristics. If the upstream is missing the required > > > characteristic(s), it could then throw an IllegalStateException. > > I figured the latter idea isn't useful: the upstream might be sorted, even > though it doesn't have the sorted characteristic. So it would be harsh for > the Gatherer to throw an exception in that case. > > > For a rather long time Gatherer had characteristics, however, > > > what I noticed is that given composition of Gatherers what ended up > > happening > > > almost always was that the combination of characteristics added overhead > > and devolved into the empty set > > > real fast. > > > > > > Also, when it comes to things like sorted() and distinct(), they (by > > necessity) have to get processed in full > > > before emitting anything downstream, which creates a lot of extra memory > > allocation and doesn't lend themselves all that well to any depth-first > > streaming. > > In the given use case, well-formed Streams would already have the sorted and > distinct characteristics. So the idea was that the sorted() and distinct() > gatherers would be eliminated from the pipeline entirely in those cases. Only > malformed Streams would have to pay the cost of sorted() and distinct(), but > that'd be an acceptable price for them to pay. > > That being said, I hadn't considered cases where an intermediate stage in the > pipeline would not propagate the characteristics. And in cases where the > intermediate stage doesn't affect the characteristics, it would actually be > worse to use something like `Gatherers.sorted().andThen(…)`, instead of just > keeping track of the previous element and throwing an IllegalStateException > if necessary. > > > >The returns clause of Gatherer.Integrator::integrate just states "true > > > if subsequent integration is desired, false if not". In particular, it > > > doesn't document the behavior I'm observing, that returning false also > > > causes downstream to reject any further output elements. > > > > > > Do you have a test case? (There was a bug fixed in this area after 22 was > > released, so you may want to test it on a 23-ea) > > I've uploaded a test case ( > https://urldefense.com/v3/__https://gist.github.com/anthonyvdotbe/17e2285bb4f497ed91502b3c09b9a000__;!!ACWV5N9M2RV99hQ!K6tYLK81tcE53MJoE6Dy5VsdhRBlKjNSIbt2BZ-ymlsPWKXiD1FLu-nWwI8WoOyZWihFugQZw9kXEKupSw$ > ), but this is indeed already fixed in JDK 23-ea. > > This raises a new question though: on line 27 I'd expect I wouldn't need to > specify the type arguments for the `ofSequential` method invocation. Is this > hitting inherent limitations of type inference or is it possible that some > generic type bounds aren't as generic as they could be, prohibiting type > inference in certain cases? > > > Cheers, > > > > > > √ > > > > > > **Viktor Klang** > > > Software Architect, Java Platform Group > > > > > > Oracle > > Kind regards, > > Anthony > > > ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ > > > > > > **From:** core-libs-dev <core-libs-dev-r...@openjdk.org> on behalf of > > Anthony Vanelverdinghe <d...@anthonyv.be> > > > **Sent:** Saturday, 27 July 2024 08:57 > > > **To:** core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> > > > **Subject:** Stream Gatherers (JEP 473) feedback > > > > > > > > > When writing factory methods for Gatherers, there's sometimes a > > > > > > degenerate case that requires returning a no-op Gatherer. So I'd like a > > > > > > way to mark a no-op Gatherer as such, allowing the Stream implementation > > > > > > to recognize and eliminate it from the pipeline. One idea is to add > > > > > > Gatherer.defaultIntegrator(), analogous to the other default… methods. > > > > > > Another is to add Gatherers.identity(), analogous to Function.identity(). > > > > > > Sometimes a factory method returns a Gatherer that only works correctly > > > > > > if the upstream has certain characteristics, for example > > > > > > Spliterator.SORTED or Spliterator.DISTINCT. One idea is to add methods > > > > > > like Gatherers.sorted() and Gatherers.distinct(), where the Stream > > > > > > implementation would be able to recognize and eliminate these from the > > > > > > pipeline if the upstream already has these characteristics. That way > > > > > > we'd be able to write `return Gatherers.sorted().andThen(…);`. Another > > > > > > idea is to provide a Gatherer with a way to inspect the upstream > > > > > > characteristics. If the upstream is missing the required > > > > > > characteristic(s), it could then throw an IllegalStateException. > > > > > > The returns clause of Gatherer.Integrator::integrate just states "true > > > > > > if subsequent integration is desired, false if not". In particular, it > > > > > > doesn't document the behavior I'm observing, that returning false also > > > > > > causes downstream to reject any further output elements. > > > > > > In the Implementation Requirements section of Gatherer, rephrasing > > > > > > "Outputs and state later in the input sequence will be discarded if > > > > > > processing an earlier partition short-circuits." to something like the > > > > > > following would be clearer to me: "As soon as any partition > > > > > > short-circuits, the whole Gatherer short-circuits. The state of other > > > > > > partitions is discarded, i.e. there are no further invocations of the > > > > > > combiner. The finisher is invoked with the short-circuiting partition's > > > > > > state." I wouldn't mention discarding of outputs, since that's implied > > > > > > by the act of short-circuiting. > > > > > > Anthony > > > >