He-Pin commented on code in PR #1937: URL: https://github.com/apache/pekko/pull/1937#discussion_r2193924686
########## stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala: ########## @@ -1779,6 +1779,46 @@ trait FlowOps[+Out, +Mat] { def groupedWeighted(minWeight: Long)(costFn: Out => Long): Repr[immutable.Seq[Out]] = via(GroupedWeighted[Out](minWeight, costFn)) + /** + * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element, + * when the result of the function is not the same as the previous element's result, a chunk is emitted. + * + * The `f` function must return a non-null value for all elements, otherwise the stage will fail. + * + * '''Emits when''' the delimiter function returns a different value than the previous element's result + * + * '''Backpressures when''' a chunk has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def groupedAdjacentBy[T](f: Out => T): Repr[immutable.Seq[Out]] = + via(GroupedAdjacentByWeighted(f, Long.MaxValue, ConstantFun.oneLong)) + + /** + * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element, + * when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds + * the `maxWeight`, a chunk is emitted. + * + * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, + * otherwise the stage will fail. + * + * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. + * + * '''Backpressures when''' a chunk has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def groupedAdjacentByWeighted[T](f: Out => T, maxWeight: Long)(costFn: Out => Long): Repr[immutable.Seq[Out]] = Review Comment: scaladsl'SubFlow extends FlowOps, so it's visible to scaladsl's Source/SubFlow/SubSource. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org