He-Pin commented on code in PR #989:
URL: https://github.com/apache/incubator-pekko/pull/989#discussion_r1460383732
##########
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala:
##########
@@ -1792,6 +1792,30 @@ trait FlowOps[+Out, +Mat] {
*/
def foldAsync[T](zero: T)(f: (T, Out) => Future[T]): Repr[T] = via(new
FoldAsync(zero, f))
+ /**
+ * implemented by `Fold`, only emits its result when the upstream completes,
+ * after which it also completes. Applies the given function towards
initialized `true`
+ * and next predicate result, yielding the next predicate result.
+ *
+ * If the function `predicate` throws an exception and the supervision
decision is
+ * [[pekko.stream.Supervision.Restart]] current value starts at `true` again
+ * the stream will continue.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' upstream completes
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * See also [[FlowOps.fold]]
+ */
+ def forall(predicate: Out => Boolean): Repr[Boolean] =
+ via(Fold(true, (acc: Boolean, element) => acc && predicate(element)))
Review Comment:
add `.name("forAll")`, impelment it with `fold` has once drawback, which is
we can't do a quick cancel after any elemement doesn't match the predicate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]