He-Pin commented on code in PR #989: URL: https://github.com/apache/incubator-pekko/pull/989#discussion_r1468539657
########## stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala: ########## @@ -339,6 +340,47 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } } + "The forall sink" must { + + "completes with `ture` when all elements match" in { + Source(1 to 4) + .runWith(Sink.forall(_ > 0)) + .futureValue shouldBe true + } + + "completes with `false` when any element match" in { + Source(1 to 4) + .runWith(Sink.forall(_ > 2)) + .futureValue shouldBe false + } + + "completes with `true` if the stream is empty" in { + Source.empty[Int] + .runWith(Sink.forall(_ > 2)) + .futureValue shouldBe true + } + + "completes with `Failure` if the stream failed" in { + Source.failed[Int](new RuntimeException("Oops")) + .runWith(Sink.forall(_ > 2)) + .failed.futureValue shouldBe a[RuntimeException] + } + + "completes with `true` with restart strategy" in { + val sink = Sink.forall[Int](elem => { + if (elem == 2) { + throw new RuntimeException("Oops") + } + elem > 0 + }).withAttributes(supervisionStrategy(Supervision.restartingDecider)) + + Source(1 to 2) + .runWith(sink) + .futureValue shouldBe true Review Comment: @pjfanning The requested test have been added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org