He-Pin commented on code in PR #1823: URL: https://github.com/apache/pekko/pull/1823#discussion_r2051745604
########## stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala: ########## @@ -930,6 +931,47 @@ object Source { def queue[T](bufferSize: Int): Source[T, BoundedSourceQueue[T]] = Source.fromGraph(new BoundedSourceQueueStage[T](bufferSize)) + /** + * Creates a Source that will immediately execute the provided function `f` with a [[BoundedSourceQueue]] when materialized. + * This allows defining element production logic at Source creation time. + * + * The function `f` can push elements to the stream using the provided queue. The queue behaves the same as in [[Source.queue]]: + * <br> + * - Elements are emitted when there is downstream demand, buffered otherwise + * <br> + * - Elements are dropped if the buffer is full + * <br> + * - Buffered elements are discarded if downstream terminates + * + * If the function `f` throws an exception, the queue will be failed and the exception will be propagated to the stream. + * + * Example usage: + * {{{ + * Source.create[Int](10) { queue => + * // This code is executed when the source is materialized + * queue.offer(1) + * queue.offer(2) + * queue.offer(3) + * queue.complete() + * } Review Comment: To which? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org