He-Pin commented on code in PR #989:
URL: https://github.com/apache/incubator-pekko/pull/989#discussion_r1465359580


##########
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala:
##########
@@ -447,6 +440,28 @@ object Sink {
   def foldAsync[U, T](zero: U)(f: (U, T) => Future[U]): Sink[T, Future[U]] =
     
Flow[T].foldAsync(zero)(f).toMat(Sink.head)(Keep.right).named("foldAsyncSink")
 
+  /**
+   * A `Sink` that will test the given predicate `p` for every received 
element and
+   *  1. completes and returns [[scala.concurrent.Future]] of `true` if the 
predicate is true for all elements;
+   *  2. completes and returns [[scala.concurrent.Future]] of `true` if the 
stream is empty (i.e. completes before signalling any elements);
+   *  3. completes and returns [[scala.concurrent.Future]] of `false` if the 
predicate is false for any element.
+   *
+   * The materialized value [[scala.concurrent.Future]] will be completed with 
the value `true` or `false`
+   * when the input stream ends, or completed with `Failure` if there is a 
failure signaled in the stream.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Completes when''' upstream completes or the predicate `p` returns 
`false`
+   *
+   * '''Backpressures when''' the invocation of predicate `p` has not yet 
completed
+   *
+   * '''Cancels when''' predicate `p` returns `false`
+   */
+  def forall[T](p: T => Boolean): Sink[T, Future[Boolean]] =
+    Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && p(_))
+      .toMat(Sink.head)(Keep.right)
+      .named("forallSink")

Review Comment:
   the implementation makes use of `foldWhile`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org
For additional commands, e-mail: notifications-h...@pekko.apache.org

Reply via email to