Hi Viktor

Your previous response inspired me to experiment some more with Gatherers
As a result I'd like to make a couple propositions:

* add an additional type parameter.
  After doing so, type inference no longer needs any assistance:
  `var maxGatherer = Gatherer.ofSequential(State::new, State::integrate, 
State::finish);`
* add an identity Gatherer with an optimized `andThen` implementation
  as well as an optimization in the default implementation of 
`Gatherer::andThen`
* eliminate the use of raw types in `Gatherers.Value`

Code that implements these propositions is in this gist: 
https://gist.github.com/anthonyvdotbe/37c85eaa86a7833051bc33f6fe88046c

Kind regards,
Anthony

July 31, 2024 at 7:58 PM, "Viktor Klang" <viktor.kl...@oracle.com> wrote:
> 
> Hi Anthony,
> 
> >The use case is a time series, which has methods to return a Stream of data 
> >points, `record DataPoint(Instant, BigDecimal)`. In DataPoint, there are 
> >several Gatherer factory methods, one of which is `Gatherer<DataPoint, ?, 
> >DataPoint> withInterpolationAt(NavigableSet<Instant> instants)`. If 
> >`instants.isEmpty()`, it returns a no-op Gatherer. In general, I guess most 
> >factory methods with a collection parameter (and compatible type arguments 
> >for T and R) will have a degenerate case like this when the collection is 
> >empty. `<T> Gatherer<T, ?, T> append(T... elements)` would be another 
> >example.
> 
> `identity()` would also allow an optimized `andThen` implementation, simply 
> returning its argument. And when uncomposed, the Stream library could 
> eliminate the `gather` stage, allowing characteristics to propogate in this 
> case. So `treeSet.stream().gather(identity()).sorted().distinct().toList()` 
> could be optimized to `treeSet.stream().toList()`.
> 
> Have you experimented with implementing your own identity Gatherer and 
> implemented its andThen to return the second argument?
> 
> >That being said, I hadn't considered cases where an intermediate stage in 
> >the pipeline would not propagate the characteristics. And in cases where the 
> >intermediate stage doesn't affect the characteristics, it would actually be 
> >worse to use something like `Gatherers.sorted().andThen(…)`, instead of just 
> >keeping track of the previous element and throwing an IllegalStateException 
> >if necessary.
> 
> Yeah, that or implementing a reorder buffer Gatherer (in case you have unique 
> and continuous sequence numbers).
> 
> >This raises a new question though: on line 27 I'd expect I wouldn't need to 
> >specify the type arguments for the `ofSequential` method invocation. Is this 
> >hitting inherent limitations of type inference or is it possible that some 
> >generic type bounds aren't as generic as they could be, prohibiting type 
> >inference in certain cases?
> 
> Yes, there are some limitations to inference, you can see usage examples in 
> the implementation of Gatherers which does need some assistance to 
> inference:https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/stream/Gatherers.java
> 
> Cheers,
> 
> √
> 
> **Viktor Klang**
> Software Architect, Java Platform Group
> 
> Oracle
> 
> ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
> 
> **From:** Anthony Vanelverdinghe <d...@anthonyv.be>
> **Sent:** Tuesday, 30 July 2024 17:20
> **To:** Viktor Klang <viktor.kl...@oracle.com>; core-libs-dev@openjdk.org 
> <core-libs-dev@openjdk.org>
> **Subject:** [External] : Re: Stream Gatherers (JEP 473) feedback
>  
> 
> July 29, 2024 at 8:08 PM, "Viktor Klang" <viktor.kl...@oracle.com> wrote:
> 
> >
> 
> > Hi Anthony,
> 
> Hi Viktor
> 
> > Thank you for your patience, and for providing feedback, it is always much 
> > appreciated.
> 
> >
> 
> > >When writing factory methods for Gatherers, there's sometimes a
> 
> > degenerate case that requires returning a no-op Gatherer. So I'd like a
> 
> > way to mark a no-op Gatherer as such, allowing the Stream implementation
> 
> > to recognize and eliminate it from the pipeline. One idea is to add
> 
> > Gatherer.defaultIntegrator(), analogous to the other default… methods.
> 
> > Another is to add Gatherers.identity(), analogous to Function.identity().
> 
> >
> 
> > I contemplated adding that but in the end I decided I didn't want to add it 
> > for the sake of adding it,
> 
> > but rather adding it in case it was deemed necessary.
> 
> >
> 
> > Do you have a concrete use-case (code) that you could share?
> 
> The use case is a time series, which has methods to return a Stream of data 
> points, `record DataPoint(Instant, BigDecimal)`. In DataPoint, there are 
> several Gatherer factory methods, one of which is `Gatherer<DataPoint, ?, 
> DataPoint> withInterpolationAt(NavigableSet<Instant> instants)`. If 
> `instants.isEmpty()`, it returns a no-op Gatherer. In general, I guess most 
> factory methods with a collection parameter (and compatible type arguments 
> for T and R) will have a degenerate case like this when the collection is 
> empty. `<T> Gatherer<T, ?, T> append(T... elements)` would be another example.
> 
> `identity()` would also allow an optimized `andThen` implementation, simply 
> returning its argument. And when uncomposed, the Stream library could 
> eliminate the `gather` stage, allowing characteristics to propogate in this 
> case. So `treeSet.stream().gather(identity()).sorted().distinct().toList()` 
> could be optimized to `treeSet.stream().toList()`.
> 
> > >Sometimes a factory method returns a Gatherer that only works correctly
> 
> > if the upstream has certain characteristics, for example
> 
> > Spliterator.SORTED or Spliterator.DISTINCT.
> 
> >
> 
> > Do you have a concrete use-case (code) that you could share?
> 
> All the Streams that are returned from TimeSeries are well-formed: their data 
> points are sorted and distinct. And the Gatherer factory methods in DataPoint 
> assume that their upstreams have these characteristics. However, we can't 
> prevent clients from constructing malformed Streams and invoking the 
> Gatherers on them, which will give erroneous results. Now the Gatherer could 
> keep track of the previous element and verify that the current element is 
> greater than the previous. But the idea was to eliminate this bookkeeping for 
> well-formed Streams, while still preventing erroneous results.
> 
> > >One idea is to add methods
> 
> > like Gatherers.sorted() and Gatherers.distinct(), where the Stream
> 
> > implementation would be able to recognize and eliminate these from the
> 
> > pipeline if the upstream already has these characteristics. That way
> 
> > we'd be able to write `return Gatherers.sorted().andThen(…);`. Another
> 
> > idea is to provide a Gatherer with a way to inspect the upstream
> 
> > characteristics. If the upstream is missing the required
> 
> > characteristic(s), it could then throw an IllegalStateException.
> 
> I figured the latter idea isn't useful: the upstream might be sorted, even 
> though it doesn't have the sorted characteristic. So it would be harsh for 
> the Gatherer to throw an exception in that case.
> 
> > For a rather long time Gatherer had characteristics, however,
> 
> > what I noticed is that given composition of Gatherers what ended up 
> > happening
> 
> > almost always was that the combination of characteristics added overhead 
> > and devolved into the empty set
> 
> > real fast.
> 
> >
> 
> > Also, when it comes to things like sorted() and distinct(), they (by 
> > necessity) have to get processed in full
> 
> > before emitting anything downstream, which creates a lot of extra memory 
> > allocation and doesn't lend themselves all that well to any depth-first 
> > streaming.
> 
> In the given use case, well-formed Streams would already have the sorted and 
> distinct characteristics. So the idea was that the sorted() and distinct() 
> gatherers would be eliminated from the pipeline entirely in those cases. Only 
> malformed Streams would have to pay the cost of sorted() and distinct(), but 
> that'd be an acceptable price for them to pay.
> 
> That being said, I hadn't considered cases where an intermediate stage in the 
> pipeline would not propagate the characteristics. And in cases where the 
> intermediate stage doesn't affect the characteristics, it would actually be 
> worse to use something like `Gatherers.sorted().andThen(…)`, instead of just 
> keeping track of the previous element and throwing an IllegalStateException 
> if necessary.
> 
> > >The returns clause of Gatherer.Integrator::integrate just states "true
> 
> > if subsequent integration is desired, false if not". In particular, it
> 
> > doesn't document the behavior I'm observing, that returning false also
> 
> > causes downstream to reject any further output elements.
> 
> >
> 
> > Do you have a test case? (There was a bug fixed in this area after 22 was 
> > released, so you may want to test it on a 23-ea)
> 
> I've uploaded a test case ( 
> https://urldefense.com/v3/__https://gist.github.com/anthonyvdotbe/17e2285bb4f497ed91502b3c09b9a000__;!!ACWV5N9M2RV99hQ!K6tYLK81tcE53MJoE6Dy5VsdhRBlKjNSIbt2BZ-ymlsPWKXiD1FLu-nWwI8WoOyZWihFugQZw9kXEKupSw$
>   ), but this is indeed already fixed in JDK 23-ea.
> 
> This raises a new question though: on line 27 I'd expect I wouldn't need to 
> specify the type arguments for the `ofSequential` method invocation. Is this 
> hitting inherent limitations of type inference or is it possible that some 
> generic type bounds aren't as generic as they could be, prohibiting type 
> inference in certain cases?
> 
> > Cheers,
> 
> >
> 
> > √
> 
> >
> 
> > **Viktor Klang**
> 
> > Software Architect, Java Platform Group
> 
> >
> 
> > Oracle
> 
> Kind regards,
> 
> Anthony
> 
> > ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
> 
> >
> 
> > **From:** core-libs-dev <core-libs-dev-r...@openjdk.org> on behalf of 
> > Anthony Vanelverdinghe <d...@anthonyv.be>
> 
> > **Sent:** Saturday, 27 July 2024 08:57
> 
> > **To:** core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
> 
> > **Subject:** Stream Gatherers (JEP 473) feedback
> 
> >  
> 
> >
> 
> > When writing factory methods for Gatherers, there's sometimes a
> 
> >
> 
> > degenerate case that requires returning a no-op Gatherer. So I'd like a
> 
> >
> 
> > way to mark a no-op Gatherer as such, allowing the Stream implementation
> 
> >
> 
> > to recognize and eliminate it from the pipeline. One idea is to add
> 
> >
> 
> > Gatherer.defaultIntegrator(), analogous to the other default… methods.
> 
> >
> 
> > Another is to add Gatherers.identity(), analogous to Function.identity().
> 
> >
> 
> > Sometimes a factory method returns a Gatherer that only works correctly
> 
> >
> 
> > if the upstream has certain characteristics, for example
> 
> >
> 
> > Spliterator.SORTED or Spliterator.DISTINCT. One idea is to add methods
> 
> >
> 
> > like Gatherers.sorted() and Gatherers.distinct(), where the Stream
> 
> >
> 
> > implementation would be able to recognize and eliminate these from the
> 
> >
> 
> > pipeline if the upstream already has these characteristics. That way
> 
> >
> 
> > we'd be able to write `return Gatherers.sorted().andThen(…);`. Another
> 
> >
> 
> > idea is to provide a Gatherer with a way to inspect the upstream
> 
> >
> 
> > characteristics. If the upstream is missing the required
> 
> >
> 
> > characteristic(s), it could then throw an IllegalStateException.
> 
> >
> 
> > The returns clause of Gatherer.Integrator::integrate just states "true
> 
> >
> 
> > if subsequent integration is desired, false if not". In particular, it
> 
> >
> 
> > doesn't document the behavior I'm observing, that returning false also
> 
> >
> 
> > causes downstream to reject any further output elements.
> 
> >
> 
> > In the Implementation Requirements section of Gatherer, rephrasing
> 
> >
> 
> > "Outputs and state later in the input sequence will be discarded if
> 
> >
> 
> > processing an earlier partition short-circuits." to something like the
> 
> >
> 
> > following would be clearer to me: "As soon as any partition
> 
> >
> 
> > short-circuits, the whole Gatherer short-circuits. The state of other
> 
> >
> 
> > partitions is discarded, i.e. there are no further invocations of the
> 
> >
> 
> > combiner. The finisher is invoked with the short-circuiting partition's
> 
> >
> 
> > state." I wouldn't mention discarding of outputs, since that's implied
> 
> >
> 
> > by the act of short-circuiting.
> 
> >
> 
> > Anthony
> 
> >
>

Reply via email to