Hi Anthony,

Thank you for your patience, I needed some time to experiment and think about 
your feedback.

>* how realistic is it for type inference to be improved to the point that 
>usage of the Gatherers API wouldn't require type arguments? Both technically 
>and in terms of cost-benefit?

If looking at the past to inform extrapolation into the future, then the trend 
is going in the direction of improving over time.

>Gatherers.identity()

I still need some time to experiment with this, as there are some implications.

For instance, if you do: Steam.of(1).gather(Gatherers.identity()) you'd want 
that gatherer to be dropped since it is a no-op, but you can't really do that 
without breaking the contract of Stream.gather, as that operation should 
"consume" the original Stream reference and return a new one (to preserve 
stream building linearity), so you'd still need to create a new 
ReferencePipeline instance which is a copy of the current one, and mark the 
previous as consumed)—in essence Stream.gather(identity) wouldn't be a no-op.

There are some other, performance-related, things I'll need to verify as well 
before coming to any conclusion on this.

>Gatherers.concat(Stream<T> stream)

Creating such a Gatherer means that it is not reusable. You'd need to have a 
Supplier<Stream<T>>. Otherwise this happens:

jshell>     public static <T> Gatherer<T, ?, T> concat(Stream<T> newStream) {
   ...>         return Gatherer.of(
   ...>                 Gatherer.Integrator.ofGreedy((_, e, d) -> d.push(e)),
   ...>                 (_, d) -> newStream.sequential().allMatch(d::push)
   ...>         );
   ...>     }
   ...>
|  created method concat(Stream<T>)

jshell> var inject = concat(Stream.of(1,2))
inject ==> GathererImpl[initializer=DEFAULT, integrator=$Lam ... 
00001c00000db898@1068e947]

jshell> Stream.of(0).gather(inject.andThen(inject)).toList()
|  Exception java.lang.IllegalStateException: stream has already been operated 
upon or closed
|        at AbstractPipeline.evaluate (AbstractPipeline.java:260)
|        at ReferencePipeline.allMatch (ReferencePipeline.java:677)
|        at lambda$concat$1 (#4:4)
|        at Gatherers$Composite.lambda$impl$3 (Gatherers.java:611)
|        at GathererOp$GatherSink.end (GathererOp.java:181)
|        at AbstractPipeline.copyInto (AbstractPipeline.java:571)
|        at AbstractPipeline.wrapAndCopyInto (AbstractPipeline.java:560)
|        at AbstractPipeline.evaluate (AbstractPipeline.java:636)
|        at AbstractPipeline.evaluateToArrayNode (AbstractPipeline.java:291)
|        at ReferencePipeline.toArray (ReferencePipeline.java:656)
|        at ReferencePipeline.toArray (ReferencePipeline.java:662)
|        at ReferencePipeline.toList (ReferencePipeline.java:667)
|        at (#6:1)

That being said, given how little code it takes to implement something like 
that, I am not sure it warrants inclusion:

jshell>     public static <T> Gatherer<T, ?, T> concat(Supplier<? extends 
Stream<T>> newStream) {
   ...>         return Gatherer.of(
   ...>                 Gatherer.Integrator.ofGreedy((_, e, d) -> d.push(e)),
   ...>                 (_, d) -> newStream.get().sequential().allMatch(d::push)
   ...>         );
   ...>     }
|  created method concat(Supplier<? extends Stream<T>>)

jshell> var inject = concat(() -> Stream.of(1,2))
inject ==> GathererImpl[initializer=DEFAULT, integrator=$Lam ... 
00001c00000d9c70@1a052a00]

jshell> Stream.of(0).gather(inject.andThen(inject)).toList()
$1 ==> [0, 1, 2, 1, 2]

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: Anthony Vanelverdinghe <d...@anthonyv.be>
Sent: Monday, 19 August 2024 20:37
To: Viktor Klang <viktor.kl...@oracle.com>; core-libs-dev@openjdk.org 
<core-libs-dev@openjdk.org>
Subject: Re: [External] : Re: Stream Gatherers (JEP 473) feedback

August 15, 2024 at 1:27 PM, "Viktor Klang" <viktor.kl...@oracle.com> wrote:
>
> Hi Anthony,

Hi Viktor

> Thanks for the input—it's much appreciated!
>
> Introducing yet another, user-facing, type parameter to get slightly improved 
> type inference is unfortunately for me a too high of a price to pay. Ideally, 
> type inference/unification will be improved over time making this issue go 
> away without impacting any signatures.

My arguments would be:
* the type parameter enables using subtypes of Downstream, e.g. 
`Gatherer::integrator` could return an `Integrator<A, T, R, 
SpecialDownstream<R>>`
* the type parameter improves type inference
* the type parameter would increase usability. In my experience, nearly all 
Gatherers are created through the factory methods in Gatherer. And thanks to 
the improved type inference, I assert that all factory method invocations would 
work without any type arguments at all. Nowadays type inference is so good that 
I found it remarkable how often (relatively speaking) I need to provide type 
arguments with Gatherers, compared to other generic APIs. A substantial amount 
of Java developers has probably never even had to provide type arguments 
before, so being able to eliminate their need from the Gatherers API as well 
seems like a considerable advantage to me
* how realistic is it for type inference to be improved to the point that usage 
of the Gatherers API wouldn't require type arguments? Both technically and in 
terms of cost-benefit?

> I'm warming up to the idea of shipping a Gatherers.identity(), and before 
> that happens I would like to see more use-cases where having such a thing 
> would provide a real edge. I can come up with a bunch of synthetic scenarios 
> where it's a win, but it's always better to see app logic numbers.

To summarize previous mails, my arguments are:
* it's a common Gatherer. Gatherers of the form `Gatherer<T, ?, T>` will likely 
have a degenerate case that is the identity. Some actual factory methods are 
`append(T... elements)` and `concat(Stream<T> stream)`, `prepend(T... 
elements)`, and `withInterpolationAt(Set<Instant> instants)`.
* optimization: if a Stream pipeline only contains compositions of 
`Gatherer.identity()`, the Gatherers can be eliminated entirely from the 
pipeline and characteristics can be propagated. So for example 
`list.stream().gather(withInterpolationAt(aSetThatHappensToBeEmpty)).count()` 
would be optimized to `list.stream().count()` and return instantly. Note that 
while a homemade implementation could optimize its `andThen` implementation, it 
wouldn't be able to optimize `Gatherer::andThen` and `Stream::gather`.
* API consistency: there's `Function.identity()`, so why not 
`Gatherers.identity()` (or `Gatherer.identity()`)? Actually I'd argue this 
method is more useful for Gatherers, since for Functions, this is often written 
as `o -> o` instead. For Gatherers there's no alternative like that.

On a final note, in case it hasn't been done yet, I'd like to propose 
`Gatherers.concat(Stream<T> stream)`. The current `Stream::concat` doesn't 
allow fluent/readable concatenation of multiple streams.

> Getting rid of the rawtypes in Value could be done, at any point since it 
> isn't exposed to user code. I'll keep this in mind for any upcoming 
> maintenance 👍
>
> Keep the feedback coming 🙂
>
> Cheers,
>
> √

Kind regards,
Anthony

> **Viktor Klang**
> Software Architect, Java Platform Group
>
> Oracle
>
> ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
>
> **From:** Anthony Vanelverdinghe <d...@anthonyv.be>
> **Sent:** Tuesday, 13 August 2024 18:32
> **To:** Viktor Klang <viktor.kl...@oracle.com>; core-libs-dev@openjdk.org 
> <core-libs-dev@openjdk.org>
> **Subject:** [External] : Re: Stream Gatherers (JEP 473) feedback
>
>
> Hi Viktor
>
> Your previous response inspired me to experiment some more with Gatherers
>
> As a result I'd like to make a couple propositions:
>
> * add an additional type parameter.
>
>   After doing so, type inference no longer needs any assistance:
>
>   `var maxGatherer = Gatherer.ofSequential(State::new, State::integrate, 
> State::finish);`
>
> * add an identity Gatherer with an optimized `andThen` implementation
>
>   as well as an optimization in the default implementation of 
> `Gatherer::andThen`
>
> * eliminate the use of raw types in `Gatherers.Value`
>
> Code that implements these propositions is in this gist: 
> https://urldefense.com/v3/__https://gist.github.com/anthonyvdotbe/37c85eaa86a7833051bc33f6fe88046c__;!!ACWV5N9M2RV99hQ!J9jmL_Q8cHhLAre5Oz5Dq3qafSXAQ2V8f-LrbjNY_tU4qSEx0LDudohXkxCugKiIJpm708DXqVct8oxUqg$
>
> Kind regards,
>
> Anthony
>
> July 31, 2024 at 7:58 PM, "Viktor Klang" <viktor.kl...@oracle.com> wrote:
>
> >
>
> > Hi Anthony,
>
> >
>
> > >The use case is a time series, which has methods to return a Stream of 
> > >data points, `record DataPoint(Instant, BigDecimal)`. In DataPoint, there 
> > >are several Gatherer factory methods, one of which is `Gatherer<DataPoint, 
> > >?, DataPoint> withInterpolationAt(NavigableSet<Instant> instants)`. If 
> > >`instants.isEmpty()`, it returns a no-op Gatherer. In general, I guess 
> > >most factory methods with a collection parameter (and compatible type 
> > >arguments for T and R) will have a degenerate case like this when the 
> > >collection is empty. `<T> Gatherer<T, ?, T> append(T... elements)` would 
> > >be another example.
>
> >
>
> > `identity()` would also allow an optimized `andThen` implementation, simply 
> > returning its argument. And when uncomposed, the Stream library could 
> > eliminate the `gather` stage, allowing characteristics to propogate in this 
> > case. So `treeSet.stream().gather(identity()).sorted().distinct().toList()` 
> > could be optimized to `treeSet.stream().toList()`.
>
> >
>
> > Have you experimented with implementing your own identity Gatherer and 
> > implemented its andThen to return the second argument?
>
> >
>
> > >That being said, I hadn't considered cases where an intermediate stage in 
> > >the pipeline would not propagate the characteristics. And in cases where 
> > >the intermediate stage doesn't affect the characteristics, it would 
> > >actually be worse to use something like `Gatherers.sorted().andThen(…)`, 
> > >instead of just keeping track of the previous element and throwing an 
> > >IllegalStateException if necessary.
>
> >
>
> > Yeah, that or implementing a reorder buffer Gatherer (in case you have 
> > unique and continuous sequence numbers).
>
> >
>
> > >This raises a new question though: on line 27 I'd expect I wouldn't need 
> > >to specify the type arguments for the `ofSequential` method invocation. Is 
> > >this hitting inherent limitations of type inference or is it possible that 
> > >some generic type bounds aren't as generic as they could be, prohibiting 
> > >type inference in certain cases?
>
> >
>
> > Yes, there are some limitations to inference, you can see usage examples in 
> > the implementation of Gatherers which does need some assistance to 
> > inference:https://urldefense.com/v3/__https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/stream/Gatherers.java__;!!ACWV5N9M2RV99hQ!J9jmL_Q8cHhLAre5Oz5Dq3qafSXAQ2V8f-LrbjNY_tU4qSEx0LDudohXkxCugKiIJpm708DXqVdv0LXetA$
>
> >
>
> > Cheers,
>
> >
>
> > √
>
> >
>
> > **Viktor Klang**
>
> > Software Architect, Java Platform Group
>
> >
>
> > Oracle
>
> >
>
> > ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
>
> >
>
> > **From:** Anthony Vanelverdinghe <d...@anthonyv.be>
>
> > **Sent:** Tuesday, 30 July 2024 17:20
>
> > **To:** Viktor Klang <viktor.kl...@oracle.com>; core-libs-dev@openjdk.org 
> > <core-libs-dev@openjdk.org>
>
> > **Subject:** [External] : Re: Stream Gatherers (JEP 473) feedback
>
> >
>
> >
>
> > July 29, 2024 at 8:08 PM, "Viktor Klang" <viktor.kl...@oracle.com> wrote:
>
> >

>
> > >
>
> >
>
> > > Hi Anthony,
>
> >
>
> > Hi Viktor
>
> >
>
> > > Thank you for your patience, and for providing feedback, it is always 
> > > much appreciated.
>
> >
>
> > >
>
> >
>
> > > >When writing factory methods for Gatherers, there's sometimes a
>
> >
>
> > > degenerate case that requires returning a no-op Gatherer. So I'd like a
>
> >
>
> > > way to mark a no-op Gatherer as such, allowing the Stream implementation
>
> >
>
> > > to recognize and eliminate it from the pipeline. One idea is to add
>
> >
>
> > > Gatherer.defaultIntegrator(), analogous to the other default… methods.
>
> >
>
> > > Another is to add Gatherers.identity(), analogous to Function.identity().
>
> >
>
> > >
>
> >
>
> > > I contemplated adding that but in the end I decided I didn't want to add 
> > > it for the sake of adding it,
>
> >
>
> > > but rather adding it in case it was deemed necessary.
>
> >
>
> > >
>
> >
>
> > > Do you have a concrete use-case (code) that you could share?
>
> >
>
> > The use case is a time series, which has methods to return a Stream of data 
> > points, `record DataPoint(Instant, BigDecimal)`. In DataPoint, there are 
> > several Gatherer factory methods, one of which is `Gatherer<DataPoint, ?, 
> > DataPoint> withInterpolationAt(NavigableSet<Instant> instants)`. If 
> > `instants.isEmpty()`, it returns a no-op Gatherer. In general, I guess most 
> > factory methods with a collection parameter (and compatible type arguments 
> > for T and R) will have a degenerate case like this when the collection is 
> > empty. `<T> Gatherer<T, ?, T> append(T... elements)` would be another 
> > example.
>
> >
>
> > `identity()` would also allow an optimized `andThen` implementation, simply 
> > returning its argument. And when uncomposed, the Stream library could 
> > eliminate the `gather` stage, allowing characteristics to propogate in this 
> > case. So `treeSet.stream().gather(identity()).sorted().distinct().toList()` 
> > could be optimized to `treeSet.stream().toList()`.
>
> >
>
> > > >Sometimes a factory method returns a Gatherer that only works correctly
>
> >
>
> > > if the upstream has certain characteristics, for example
>
> >
>
> > > Spliterator.SORTED or Spliterator.DISTINCT.
>
> >
>
> > >
>
> >
>
> > > Do you have a concrete use-case (code) that you could share?
>
> >
>
> > All the Streams that are returned from TimeSeries are well-formed: their 
> > data points are sorted and distinct. And the Gatherer factory methods in 
> > DataPoint assume that their upstreams have these characteristics. However, 
> > we can't prevent clients from constructing malformed Streams and invoking 
> > the Gatherers on them, which will give erroneous results. Now the Gatherer 
> > could keep track of the previous element and verify that the current 
> > element is greater than the previous. But the idea was to eliminate this 
> > bookkeeping for well-formed Streams, while still preventing erroneous 
> > results.
>
> >
>
> > > >One idea is to add methods
>
> >
>
> > > like Gatherers.sorted() and Gatherers.distinct(), where the Stream
>
> >
>
> > > implementation would be able to recognize and eliminate these from the
>
> >
>
> > > pipeline if the upstream already has these characteristics. That way
>
> >
>
> > > we'd be able to write `return Gatherers.sorted().andThen(…);`. Another
>
> >
>
> > > idea is to provide a Gatherer with a way to inspect the upstream
>
> >
>
> > > characteristics. If the upstream is missing the required
>
> >
>
> > > characteristic(s), it could then throw an IllegalStateException.
>
> >
>
> > I figured the latter idea isn't useful: the upstream might be sorted, even 
> > though it doesn't have the sorted characteristic. So it would be harsh for 
> > the Gatherer to throw an exception in that case.
>
> >
>
> > > For a rather long time Gatherer had characteristics, however,
>
> >
>
> > > what I noticed is that given composition of Gatherers what ended up 
> > > happening
>
> >
>
> > > almost always was that the combination of characteristics added overhead 
> > > and devolved into the empty set
>
> >
>
> > > real fast.
>
> >
>
> > >
>
> >
>
> > > Also, when it comes to things like sorted() and distinct(), they (by 
> > > necessity) have to get processed in full
>
> >
>
> > > before emitting anything downstream, which creates a lot of extra memory 
> > > allocation and doesn't lend themselves all that well to any depth-first 
> > > streaming.
>
> >
>
> > In the given use case, well-formed Streams would already have the sorted 
> > and distinct characteristics. So the idea was that the sorted() and 
> > distinct() gatherers would be eliminated from the pipeline entirely in 
> > those cases. Only malformed Streams would have to pay the cost of sorted() 
> > and distinct(), but that'd be an acceptable price for them to pay.
>
> >
>
> > That being said, I hadn't considered cases where an intermediate stage in 
> > the pipeline would not propagate the characteristics. And in cases where 
> > the intermediate stage doesn't affect the characteristics, it would 
> > actually be worse to use something like `Gatherers.sorted().andThen(…)`, 
> > instead of just keeping track of the previous element and throwing an 
> > IllegalStateException if necessary.
>
> >
>
> > > >The returns clause of Gatherer.Integrator::integrate just states "true
>
> >
>
> > > if subsequent integration is desired, false if not". In particular, it
>
> >
>
> > > doesn't document the behavior I'm observing, that returning false also
>
> >
>
> > > causes downstream to reject any further output elements.
>
> >
>
> > >
>
> >
>
> > > Do you have a test case? (There was a bug fixed in this area after 22 was 
> > > released, so you may want to test it on a 23-ea)
>
> >
>
> > I've uploaded a test case ( 
> > https://urldefense.com/v3/__https://gist.github.com/anthonyvdotbe/17e2285bb4f497ed91502b3c09b9a000__;!!ACWV5N9M2RV99hQ!K6tYLK81tcE53MJoE6Dy5VsdhRBlKjNSIbt2BZ-ymlsPWKXiD1FLu-nWwI8WoOyZWihFugQZw9kXEKupSw$
> >   ), but this is indeed already fixed in JDK 23-ea.
>
> >
>
> > This raises a new question though: on line 27 I'd expect I wouldn't need to 
> > specify the type arguments for the `ofSequential` method invocation. Is 
> > this hitting inherent limitations of type inference or is it possible that 
> > some generic type bounds aren't as generic as they could be, prohibiting 
> > type inference in certain cases?
>
> >
>
> > > Cheers,
>
> >
>
> > >
>
> >
>
> > > √
>
> >
>
> > >
>
> >
>
> > > **Viktor Klang**
>
> >
>
> > > Software Architect, Java Platform Group
>
> >
>
> > >
>
> >
>
> > > Oracle
>
> >
>
> > Kind regards,
>
> >
>
> > Anthony
>
> >
>
> > > ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
>
> >
>
> > >
>
> >
>
> > > **From:** core-libs-dev <core-libs-dev-r...@openjdk.org> on behalf of 
> > > Anthony Vanelverdinghe <d...@anthonyv.be>
>
> >
>
> > > **Sent:** Saturday, 27 July 2024 08:57
>
> >
>
> > > **To:** core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
>
> >
>
> > > **Subject:** Stream Gatherers (JEP 473) feedback
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > > When writing factory methods for Gatherers, there's sometimes a
>
> >
>
> > >
>
> >
>
> > > degenerate case that requires returning a no-op Gatherer. So I'd like a
>
> >
>
> > >
>
> >
>
> > > way to mark a no-op Gatherer as such, allowing the Stream implementation
>
> >
>
> > >
>
> >
>
> > > to recognize and eliminate it from the pipeline. One idea is to add
>
> >
>
> > >
>
> >
>
> > > Gatherer.defaultIntegrator(), analogous to the other default… methods.
>
> >
>
> > >
>
> >
>
> > > Another is to add Gatherers.identity(), analogous to Function.identity().
>
> >
>
> > >
>
> >
>
> > > Sometimes a factory method returns a Gatherer that only works correctly
>
> >
>
> > >
>
> >
>
> > > if the upstream has certain characteristics, for example
>
> >
>
> > >
>
> >
>
> > > Spliterator.SORTED or Spliterator.DISTINCT. One idea is to add methods
>
> >
>
> > >
>
> >
>
> > > like Gatherers.sorted() and Gatherers.distinct(), where the Stream
>
> >
>
> > >
>
> >
>
> > > implementation would be able to recognize and eliminate these from the
>
> >
>
> > >
>
> >
>
> > > pipeline if the upstream already has these characteristics. That way
>
> >
>
> > >
>
> >
>
> > > we'd be able to write `return Gatherers.sorted().andThen(…);`. Another
>
> >
>
> > >
>
> >
>
> > > idea is to provide a Gatherer with a way to inspect the upstream
>
> >
>
> > >
>
> >
>
> > > characteristics. If the upstream is missing the required
>
> >
>
> > >
>
> >
>
> > > characteristic(s), it could then throw an IllegalStateException.
>
> >
>
> > >
>
> >
>
> > > The returns clause of Gatherer.Integrator::integrate just states "true
>
> >
>
> > >
>
> >
>
> > > if subsequent integration is desired, false if not". In particular, it
>
> >
>
> > >
>
> >
>
> > > doesn't document the behavior I'm observing, that returning false also
>
> >
>
> > >
>
> >
>
> > > causes downstream to reject any further output elements.
>
> >
>
> > >
>
> >
>
> > > In the Implementation Requirements section of Gatherer, rephrasing
>
> >
>
> > >
>
> >
>
> > > "Outputs and state later in the input sequence will be discarded if
>
> >
>
> > >
>
> >
>
> > > processing an earlier partition short-circuits." to something like the
>
> >
>
> > >
>
> >
>
> > > following would be clearer to me: "As soon as any partition
>
> >
>
> > >
>
> >
>
> > > short-circuits, the whole Gatherer short-circuits. The state of other
>
> >
>
> > >
>
> >
>
> > > partitions is discarded, i.e. there are no further invocations of the
>
> >
>
> > >
>
> >
>
> > > combiner. The finisher is invoked with the short-circuiting partition's
>
> >
>
> > >
>
> >
>
> > > state." I wouldn't mention discarding of outputs, since that's implied
>
> >
>
> > >
>
> >
>
> > > by the act of short-circuiting.
>
> >
>
> > >
>
> >
>
> > > Anthony
>
> >
>
> > >
>
> >
>

Reply via email to