Copilot commented on code in PR #2684:
URL: https://github.com/apache/pekko/pull/2684#discussion_r2888724044
##########
stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala:
##########
@@ -525,6 +525,24 @@ object Sink {
def completionStageSink[T, M](future: CompletionStage[Sink[T, M]]): Sink[T,
CompletionStage[M]] =
lazyCompletionStageSink[T, M](() => future)
+ /**
+ * Turn a `CompletionStage[Sink]` into a Sink that will consume the values
of the source when the future completes
+ * successfully. If the `CompletionStage` is completed with a failure the
stream is failed.
+ *
+ * Unlike [[completionStageSink]] and [[lazyCompletionStageSink]], this
operator materializes the inner sink as
+ * soon as the future completes, even if no elements have arrived yet. This
means empty streams complete normally
+ * rather than failing with [[NeverMaterializedException]]. Elements that
arrive before the future completes
+ * are buffered.
+ *
+ * The materialized future value is completed with the materialized value of
the future sink or failed if the
+ * future fails, upstream fails, or downstream cancels before the inner sink
is materialized.
Review Comment:
Javadoc for eagerCompletionStageSink says elements are buffered and that the
returned CompletionStage<M> fails if upstream fails / downstream cancels before
the inner sink is materialized. The Scala implementation it delegates to only
buffers a single element (it pulls once pre-materialization) and it will still
materialize and complete the outer stage successfully even if the upstream has
already failed/finished; the failure is propagated to the inner sink instead.
Please align the docs with the actual behavior, or adjust the stage
implementation to meet the documented semantics.
```suggestion
* rather than failing with [[NeverMaterializedException]]. Before the
future completes, at most one element is
* pre-fetched from upstream and buffered.
*
* The materialized future value is completed with the materialized value
of the inner sink once it has been
* materialized, or failed if the `CompletionStage[Sink]` itself fails or
if materialization of the inner sink fails.
* Upstream failures or downstream cancellations that occur before the
inner sink is materialized are propagated
* through the inner sink and therefore reflected in its materialized
value, rather than failing this operator
* with [[NeverMaterializedException]].
```
##########
docs/src/main/paradox/stream/operators/Sink/eagerFutureSink.md:
##########
@@ -0,0 +1,35 @@
+# Sink.eagerFutureSink
+
+Materializes the inner sink when the future completes, even if no elements
have arrived yet.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+## Signature
+
+@apidoc[Sink.eagerFutureSink](Sink$) {
scala="#eagerFutureSink[T,M](future:scala.concurrent.Future[org.apache.pekko.stream.scaladsl.Sink[T,M]]):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[M]]"
}
+
+
+## Description
+
+Turn a `Future[Sink]` into a Sink that will consume the values of the source
when the future completes
+successfully. If the `Future` is completed with a failure the stream is failed.
+
+Unlike @ref:[futureSink](futureSink.md) and
@ref:[lazyFutureSink](lazyFutureSink.md), this operator materializes the inner
sink as soon as the future
+completes, even if no elements have arrived yet. This means empty streams
complete normally rather than failing
+with `NeverMaterializedException`. Elements that arrive before the future
completes are buffered.
+
+The materialized future value is completed with the materialized value of the
future sink or failed if the
+future fails, upstream fails, or downstream cancels before the inner sink is
materialized.
Review Comment:
This doc states that the materialized future is failed if upstream fails /
downstream cancels before the inner sink is materialized and that elements
arriving before future completion are buffered. In the current EagerFutureSink
stage, the outer Future[M] is completed successfully once the future sink is
materialized (even if upstream already failed/finished), and only a single
element is buffered (the stage pulls once until the future resolves). Please
update this description to match the implemented semantics (or change the stage
to buffer more / fail the outer future on early upstream termination as
described here).
```suggestion
with `NeverMaterializedException`. At most one element that arrives before
the future completes is buffered.
The materialized future value is completed with the materialized value of
the future sink as soon as the inner
sink has been materialized and is failed only if the future fails (or if
materialization of the inner sink fails).
```
##########
docs/src/main/paradox/stream/operators/Sink/eagerCompletionStageSink.md:
##########
@@ -0,0 +1,31 @@
+# Sink.eagerCompletionStageSink
+
+Materializes the inner sink when the future completes, even if no elements
have arrived yet.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+
+## Description
+
+Turn a `CompletionStage<Sink>` into a Sink that will consume the values of the
source when the future completes
+successfully. If the `CompletionStage` is completed with a failure the stream
is failed.
+
+Unlike @ref:[completionStageSink](completionStageSink.md) and
@ref:[lazyCompletionStageSink](lazyCompletionStageSink.md), this operator
materializes the inner sink as soon as the future
+completes, even if no elements have arrived yet. This means empty streams
complete normally rather than failing
+with `NeverMaterializedException`. Elements that arrive before the future
completes are buffered.
+
+The materialized future value is completed with the materialized value of the
future sink or failed if the
+future fails, upstream fails, or downstream cancels before the inner sink is
materialized.
Review Comment:
This description says the materialized value fails if upstream fails /
downstream cancels before the inner sink is materialized and that elements are
buffered until future completion. The current stage only buffers a single
element and still completes the outer Future[M] successfully once the
CompletionStage resolves (even if upstream already failed/finished),
propagating the failure to the inner sink instead. Please align the
documentation with the implementation, or update the stage to honor the
documented materialized-value semantics and buffering behavior.
```suggestion
with `NeverMaterializedException`. Elements that arrive before the future
completes are buffered, up to a single
element.
The materialized future value is completed with the materialized value of
the future sink when the future
completes successfully, or failed if the future itself completes with a
failure. If upstream fails or downstream
cancels before the inner sink is materialized, that failure or cancellation
is propagated to the inner sink once
it has been materialized, but the outer materialized future still completes
when the future completes.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]