He-Pin commented on code in PR #989: URL: https://github.com/apache/incubator-pekko/pull/989#discussion_r1465359580
########## stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala: ########## @@ -447,6 +440,28 @@ object Sink { def foldAsync[U, T](zero: U)(f: (U, T) => Future[U]): Sink[T, Future[U]] = Flow[T].foldAsync(zero)(f).toMat(Sink.head)(Keep.right).named("foldAsyncSink") + /** + * A `Sink` that will test the given predicate `p` for every received element and + * 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for all elements; + * 2. completes and returns [[scala.concurrent.Future]] of `true` if the stream is empty (i.e. completes before signalling any elements); + * 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is false for any element. + * + * The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false` + * when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Completes when''' upstream completes or the predicate `p` returns `false` + * + * '''Backpressures when''' the invocation of predicate `p` has not yet completed + * + * '''Cancels when''' predicate `p` returns `false` + */ + def forall[T](p: T => Boolean): Sink[T, Future[Boolean]] = + Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && p(_)) + .toMat(Sink.head)(Keep.right) + .named("forallSink") Review Comment: the implementation makes use of `foldWhile`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org